我有一些代码需要在每个“id”上运行,其中多个id可以出现在流批处理中,并且流按id进行分区,其中流包含具有相同id的多个示例,需要根据排序顺序进行合并
所以目前,为了对具有相同id的所有项运行“merge”函数,我将其作为rdd.mappartitions运行
mergedRdd = partitionedDf.rdd.mapPartitions(merge_payloads) # We use partition
mergedDf = spark.createDataFrame(mergedRdd)
据我目前所知,由于从jvm到python的转换(反之亦然),我付出了高昂的性能代价,有人建议我转向 applyInPandas
而Pypark的功能。
所以我想我能做的是
mergeDf = partitionedDf.groupBy('id').applyInPandas(merge_payloads_pd, 'id long, payload string')
这真的是等价的,但是有基于箭头的python/jvm转换的额外好处吗?还是我在这里遗漏了什么
1条答案
按热度按时间brccelvz1#
是的,根据文档,使用applyinpands函数,spark使用arrow并加快处理时间,避免了序列化/反序列化时间。您需要安装pyarrow并将配置spark.sql.execution.arrow.pyspark.enabled设置为true。