pyspark delta table:如何保存分组的数据框架到不同的表

bfrts1fy  于 2023-10-23  发布在  Apache
关注(0)|答案(1)|浏览(149)

类似于这个question。我如何做同样的事情来写不同的组的嵌套框架到不同的delta活表?类似于下面的地方,我不受Pandas嵌套框架的限制。允许apply传递Spark嵌套框架或Spark会话到聚合函数。

def mycustomNotPandaAgg(key, Iterator, sparkSession|sparkDataframe):
   temp_df = sparkSession.createDataFrame(Iterator) #I can apply schema here
   temp_df.createOrReplaceTable("temp_df")
   sparkSession.sql('insert into ... key as select * from temp_df') #key is table_name
   or
   sparkDataframe.writeToTable(key)  #where sparkDataframe is created internally from each group and passed into this apply function

my_df.groupBy("table_name").apply(mycustomNotPandaAgg)

ps -我已经尝试过filter方法,我为每个表过滤相同的框架,得到N个框架(每个表1个)并保存它们。这不是有效的,因为每个键的数据是倾斜的。即使我在filterSpark之前persist框架仍然为每个过滤器启动作业。

c6ubokkw

c6ubokkw1#

一种无需将所有数据拉到驱动程序即可实现此目的的方法是收集不同的键,然后单独写入每个过滤后的DataFrame:

from pyspark.sql.functions import col

filters = rtd.select("CustomerID").distinct().collect()
for f in filters:
    rtd.filter(col("CustomerID") == f[0]).show() # Replace show() with your write logic.

请注意,不幸的是,你将不得不串行地编写输出表。可以使用多处理来解决这个问题,或者可能有另一个评论者使用更Spark原生的方式来编写groupBy结果,同时使用Spark进行并行化。

相关问题