我需要查询数据库中的200+表。通过使用spark.sql = f "" select ..."语句,我得到col(0)(因为查询的结果给我关于列的具体信息,我已经检索)和计算的结果为particuare表,像这样:
| 列(0)|
| - ------|
| 1个|
我的目标是有1 csv文件,与表的名称和计算结果:
| 表名|计数|
| - ------|- ------|
| 会计|三个|
| 销售额|1个|
到目前为止,我的代码的主要部分:
list_tables = ['accounting', 'sales',...]
for table in list_tables:
df = spark.sql(
f""" select distinct errors as counts from {database}.{table} where errors is not null""")
df.repartition(1).write.mode("append").option("header","true").csv(f"s3:.......)
rename_part_file(dir,output,newdir)
我对PySpark和所有的结构都是新手。到目前为止我很困惑,因为我听说迭代 Dataframe 不是最好的主意。
通过使用下面的代码,我只得到1 csv与最近的记录,而不是所有已处理的表从我的列表_表。我卡住了,不知道是否有可能把它全部打包成1个 Dataframe ,或者我应该联合 Dataframe ?
1条答案
按热度按时间rhfm7lfc1#
我卡住了,不知道是否有可能把所有的数据打包成一个 Dataframe ,或者我应该联合 Dataframe ?
你提到的两个选项都导致了同样的结果--你必须迭代一个表列表(你不能一次读取多个表),读取其中的每一个表,执行一个SQL语句并将结果保存到DataFrame中,然后联合所有的DataFrame并保存为一个CSV文件。
注意:
union
操作只考虑列的位置,而不考虑其名称。我假设对于您的情况,这是所需的行为,因为您只提取了一个统计信息。