pyspark 列出数据库中每个表的所有列名

ibps3vxo  于 2022-12-22  发布在  Spark
关注(0)|答案(1)|浏览(277)

我的任务是列出名为"trial_db"的数据库中每个表的所有列名
我能够从以下代码中获取所有表名,但我尝试包括列名

AnalysisException: Table or view not found: countrycurrency_csv; line 1 pos 9;
---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
<command-1607350> in <module>
     16   # For each table, get list of columns
     17   for table in tables:
---> 18     columns = [c.name for c in spark.sql(f"describe {table}").collect()]
     19     # Create a dataframe with database, table, and columns information
     20     df = spark.createDataFrame([(db.databaseName, table, columns)], schema=['database', 'table', 'columns'])

/databricks/spark/python/pyspark/sql/session.py in sql(self, sqlQuery)
    775         [Row(f1=1, f2='row1'), Row(f1=2, f2='row2'), Row(f1=3, f2='row3')]
    776         """
--> 777         return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
    778 
    779     def table(self, tableName):

/databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
#Creating an empty DF (This is kind of an Hack...)
tbl_df = spark.sql("show tables in trial_db like 'xxx'")
#Loop through all databases
for db in spark.sql("show databases like 'trial_db'").collect():
  #create a dataframe with list of tables from the database
  df = spark.sql(f"show tables in {db.databaseName}")
  #union the tables list dataframe with main dataframe 
  tbl_df = tbl_df.union(df)
  
#After the loop, show the results
tbl_df.show()

我希望在每个表中包含列名。
我尝试了以下操作,但出现错误:

def create_df(table_df):
    table_df= table_df.withColumn("database", df.database.cast(String))
    table_df= table_df.withColumn("table", df.table.cast(String))
    table_df= table_df.withColumn("columns", df.columns.cast(string))

    return table_df

# Loop through all databases
for db in spark.sql("show databases like 'trial_db'").collect():
  # Get list of tables in the database
  tables = spark.sql(f"show tables in {db.databaseName}").rdd.map(lambda row: row.tableName).collect()
  
  # For each table, get list of columns
  for table in tables:
    columns = [c.name for c in spark.sql(f"describe {table}").collect()]
    # Create a dataframe with database, table, and columns information
    df = spark.createDataFrame([(db.databaseName, table, columns)], schema=['database', 'table', 'columns'])
    # Union the dataframe with main dataframe 
    tbl_df = tbl_df.union(df)

# After the loop, show the results
tbl_df.show()
ogq8wdun

ogq8wdun1#

您离解决方案不远了。您遇到了一个错误,因为您没有指定要在哪个数据库上使用DESCRIBE查询表。此外,您的第一个 hack 查询提供了以下架构:

#Creating an empty DF (This is kind of an Hack...)
tbl_df = spark.sql("show tables in trial_db like 'xxx'")

tbl_df.printSchema()
root
 |-- database: string (nullable = false)
 |-- tableName: string (nullable = false)
 |-- isTemporary: boolean (nullable = false)

而目标架构为:

root
 |-- database: string (nullable = true)
 |-- table: string (nullable = true)
 |-- columns: array (nullable = true)
 |    |-- element: string (containsNull = true)

因为列中的列将是一个字符串数组,这些字符串是列名称。
我的解决方案是:

#Creating an empty DF with defined schema (instead of your "Hack")
schema = "database string, table string, columns array<string>"
tbl_df = spark.createDataFrame([],schema)
 
# Loop through all databases
for db in spark.sql("show databases like 'trial_db'").collect():
  # Get list of tables in the database
  tables = spark.sql(f"show tables in {db.databaseName}").rdd.map(lambda row: row.tableName).collect()
  
  # For each table, get list of columns
  for table in tables:
    # You need to specify which database you want to use before describing the table. 
    # Especially if you are looping on several databases.
    spark.sql(f"use {db.databaseName}") 
    
    # Create an array of columns
    columns = [c.col_name for c in spark.sql(f"describe {table}").collect()]
    # Create a dataframe with database, table, and columns information
    df = spark.createDataFrame([(db.databaseName, table, columns)], schema=['database', 'table', 'columns'])
    # Union the dataframe with main dataframe 
    tbl_df = tbl_df.union(df)

下面是一个使用公共“databricks”数据库的结果示例。

# After the loop, show the results
tbl_df.show()

+----------+--------------+--------------------+
|  database|         table|             columns|
+----------+--------------+--------------------+
|databricks| airlineflight|[Year, Month, Day...|
|databricks|  airlineplane|[tailnum, type, m...|
|databricks|bikesharingday|[instant, dteday,...|
|databricks|      citydata|[rankIn2016, stat...|
|databricks|databricksblog|[authors, categor...|
|databricks|     ipgeocode|[startingIP, endi...|
|databricks|     people10m|[id, firstName, m...|
|databricks|      ssanames|[firstName, gende...|
+----------+--------------+--------------------+

此外,您没有使用create_df定义的函数。

相关问题