使用PySpark DataFrame迭代数据库

but5z9lq  于 2023-01-12  发布在  Spark
关注(0)|答案(1)|浏览(156)

我需要查询数据库中的200+表。通过使用spark.sql = f "" select ..."语句,我得到col(0)(因为查询的结果给我关于列的具体信息,我已经检索)和计算的结果为particuare表,像这样:
| 列(0)|
| - ------|
| 1个|
我的目标是有1 csv文件,与表的名称和计算结果:
| 表名|计数|
| - ------|- ------|
| 会计|三个|
| 销售额|1个|
到目前为止,我的代码的主要部分:

list_tables = ['accounting', 'sales',...]

for table in list_tables:
  df = spark.sql(
     f""" select distinct errors as counts from {database}.{table} where errors is not null""")

  df.repartition(1).write.mode("append").option("header","true").csv(f"s3:.......)
  rename_part_file(dir,output,newdir)

我对PySpark和所有的结构都是新手。到目前为止我很困惑,因为我听说迭代 Dataframe 不是最好的主意。
通过使用下面的代码,我只得到1 csv与最近的记录,而不是所有已处理的表从我的列表_表。我卡住了,不知道是否有可能把它全部打包成1个 Dataframe ,或者我应该联合 Dataframe ?

rhfm7lfc

rhfm7lfc1#

我卡住了,不知道是否有可能把所有的数据打包成一个 Dataframe ,或者我应该联合 Dataframe ?
你提到的两个选项都导致了同样的结果--你必须迭代一个表列表(你不能一次读取多个表),读取其中的每一个表,执行一个SQL语句并将结果保存到DataFrame中,然后联合所有的DataFrame并保存为一个CSV文件。

from pyspark.sql.functions import lit
from functools import reduce

tables = ["tableA", "tableB", "tableC"]
dfs = []
for table in tables:
    dfs.append(spark.read.table(table).sql("my sql statement").withColumn("TableName", lit(table))) # Append the DF with SQL query results

df = reduce(lambda df1, df2: df1.union(df2), dfs) # Union all DFs
df.coalesce(1).write.mode("overwrite").csv("my_csv.csv") # Combine and write as single file

注意:union操作只考虑列的位置,而不考虑其名称。我假设对于您的情况,这是所需的行为,因为您只提取了一个统计信息。

相关问题