Pypark使用mapinpandas而不是rdd.mappartitions-是否等效

oxosxuxt  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(336)

我有一些代码需要在每个“id”上运行,其中多个id可以出现在流批处理中,并且流按id进行分区,其中流包含具有相同id的多个示例,需要根据排序顺序进行合并
所以目前,为了对具有相同id的所有项运行“merge”函数,我将其作为rdd.mappartitions运行

mergedRdd = partitionedDf.rdd.mapPartitions(merge_payloads) # We use partition

mergedDf = spark.createDataFrame(mergedRdd)

据我目前所知,由于从jvm到python的转换(反之亦然),我付出了高昂的性能代价,有人建议我转向 applyInPandas 而Pypark的功能。
所以我想我能做的是

mergeDf = partitionedDf.groupBy('id').applyInPandas(merge_payloads_pd, 'id long, payload string')

这真的是等价的,但是有基于箭头的python/jvm转换的额外好处吗?还是我在这里遗漏了什么

brccelvz

brccelvz1#

是的,根据文档,使用applyinpands函数,spark使用arrow并加快处理时间,避免了序列化/反序列化时间。您需要安装pyarrow并将配置spark.sql.execution.arrow.pyspark.enabled设置为true。

相关问题