pyspark 从Spark Dataframe 获取表名

00jrzges  于 2022-12-22  发布在  Spark
关注(0)|答案(6)|浏览(303)

如果我有一个 Dataframe 创建如下:

df = spark.table("tblName")

有什么方法可以从df中恢复tblName吗?

o7jaxewo

o7jaxewo1#

您可以从计划中提取它:

df.logicalPlan().argString().replace("`","")
jogvjijk

jogvjijk2#

我们可以通过解析unresolved logical plan从 Dataframe 中提取tablename,具体方法如下:

def getTableName(df: DataFrame): String = {

  Seq(df.queryExecution.logical, df.queryExecution.optimizedPlan).flatMap{_.collect{
      case LogicalRelation(_, _, catalogTable: Option[CatalogTable], _) =>
        if (catalogTable.isDefined) {
          Some(catalogTable.get.identifier.toString())
        } else None
      case hive: HiveTableRelation => Some(hive.tableMeta.identifier.toString())
    }
  }.flatten.head
}
scala> val df = spark.table("db.table")

scala> getTableName(df)
res: String = `db`.`table`
6jygbczu

6jygbczu3#

以下实用程序函数可能有助于从给定的DataFrame确定表名。

def get_dataframe_tablename(df: pyspark.sql.DataFrame) -> typing.Optional[str]:
    """
    If the dataframe was created from an underlying table (e.g. spark.table('dual') or
    spark.sql("select * from dual"), this function will return the
    fully qualified table name (e.g. `default`.`dual`) as output otherwise it will return None.

    Test on: python 3.7, spark 3.0.1, but it should work with Spark >=2.x and python >=3.4 too
    Examples:
        >>> get_dataframe_tablename(spark.table('dual'))
        `default`.`dual`
        >>> get_dataframe_tablename(spark.sql("select * from dual"))
        `default`.`dual`

    It inspects the output of `df.explain()` to determine that the df was created from a table or not
    :param df: input dataframe whose underlying table name will be return
    :return:  table name or None
    """

    def _explain(_df: pyspark.sql.DataFrame) -> str:
        # df.explain() does not take parameter to accept the out and dump the output on stdout
        # by default
        import contextlib
        import io
        with contextlib.redirect_stdout(io.StringIO()) as f:
            _df.explain()
        f.seek(0)  # Rewind stream position
        explanation = f.readlines()[1]  # Ignore first output line(#Physical Plan...)
        return explanation

    pattern = re.compile("Scan hive (.+), HiveTableRelation (.+?), (.+)")
    output = _explain(df)
    match = pattern.search(output)
    return match.group(2) if match else None
z9zf31ra

z9zf31ra4#

下面三行代码将给予表和数据库名称

import org.apache.spark.sql.execution.FileSourceScanExec
df=session.table("dealer")
df.queryExecution.sparkPlan.asInstanceOf[FileSourceScanExec].tableIdentifier
4si2a6ki

4si2a6ki5#

这个问题有答案了吗?我找到了一个方法,但可能不是最好的。你可以通过检索物理执行计划,然后对它做一些字符串拆分魔术来访问tablename。
假设您有一个来自database_name.tblName的表,下面的代码应该可以工作:

execution_plan = df.__jdf.queryExecution().simpleString()
table_name = string.split('FileScan')[1].split('[')[0].split('.')[1]

第一行将以字符串格式返回执行计划,看起来类似于:

== Physical Plan ==\n*(1) ColumnarToRow\n+- FileScan parquet database_name.tblName[column1#2880,column2ban#2881] Batched: true, DataFilters: [], Format: Parquet, Location: PreparedDeltaFileIndex[dbfs:/mnt/lake/database_name/table_name], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<column1:string,column2:string...\n\n'

之后,您可以运行一些字符串拆分来访问相关信息。第一个字符串拆分将获取FileScan的所有元素-您对第二个元素感兴趣,然后是[之前和之后的元素-这里第一个元素是感兴趣的。

k3bvogb1

k3bvogb16#

您可以从df创建表。但如果表是本地临时视图或全局临时视图,则应在创建同名表之前删除它(sqlContext.dropTempTable),或者使用创建或替换函数(spark.createOrReplaceGlobalTempView或spark.createOrReplaceTempView)。如果表是临时表,则可以创建同名表而不会出错

#Create data frame
>>> d = [('Alice', 1)]
>>> test_df = spark.createDataFrame(sc.parallelize(d), ['name','age'])
>>> test_df.show()
+-----+---+
| name|age|
+-----+---+
|Alice|  1|
+-----+---+

#create tables
>>> test_df.createTempView("tbl1")
>>> test_df.registerTempTable("tbl2")
>>> sqlContext.tables().show()
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|        |     tbl1|       true|
|        |     tbl2|       true|
+--------+---------+-----------+

#create data frame from tbl1
>>> df = spark.table("tbl1")
>>> df.show()
+-----+---+
| name|age|
+-----+---+
|Alice|  1|
+-----+---+

#create tbl1 again with using df data frame. It will get error
>>> df.createTempView("tbl1")
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: "Temporary view 'tbl1' already exists;"

#drop and create again
>>> sqlContext.dropTempTable('tbl1')
>>> df.createTempView("tbl1")
>>> spark.sql('select * from tbl1').show()
+-----+---+
| name|age|
+-----+---+
|Alice|  1|
+-----+---+

#create data frame from tbl2 and replace name value
>>> df = spark.table("tbl2")
>>> df = df.replace('Alice', 'Bob')
>>> df.show()
+----+---+
|name|age|
+----+---+
| Bob|  1|
+----+---+

#create tbl2 again with using df data frame
>>> df.registerTempTable("tbl2")
>>> spark.sql('select * from tbl2').show()
+----+---+
|name|age|
+----+---+
| Bob|  1|
+----+---+

相关问题