类似于这个question。我如何做同样的事情来写不同的组的嵌套框架到不同的delta
活表?类似于下面的地方,我不受Pandas嵌套框架的限制。允许apply
传递Spark嵌套框架或Spark会话到聚合函数。
def mycustomNotPandaAgg(key, Iterator, sparkSession|sparkDataframe):
temp_df = sparkSession.createDataFrame(Iterator) #I can apply schema here
temp_df.createOrReplaceTable("temp_df")
sparkSession.sql('insert into ... key as select * from temp_df') #key is table_name
or
sparkDataframe.writeToTable(key) #where sparkDataframe is created internally from each group and passed into this apply function
my_df.groupBy("table_name").apply(mycustomNotPandaAgg)
ps -我已经尝试过filter
方法,我为每个表过滤相同的框架,得到N个框架(每个表1个)并保存它们。这不是有效的,因为每个键的数据是倾斜的。即使我在filter
Spark之前persist
框架仍然为每个过滤器启动作业。
1条答案
按热度按时间c6ubokkw1#
一种无需将所有数据拉到驱动程序即可实现此目的的方法是收集不同的键,然后单独写入每个过滤后的DataFrame:
请注意,不幸的是,你将不得不串行地编写输出表。可以使用多处理来解决这个问题,或者可能有另一个评论者使用更Spark原生的方式来编写groupBy结果,同时使用Spark进行并行化。