了解如何在并行转换多个 Dataframe 时实现最佳并行性
我有一个路径数组
val paths = Array("path1", "path2", .....
我从每个路径加载 Dataframe ,然后转换并写入目标路径
paths.foreach(path => {
val df = spark.read.parquet(path)
df.transform(processData).write.parquet(path+"_processed")
})
转换processData
与我正在加载的 Dataframe 无关。
这限制了我一次只能处理一个 Dataframe ,而且我的大部分集群资源都是空闲的,因为处理每个 Dataframe 是独立的,所以我把scala的Array
转换成了ParArray
。
paths.par.foreach(path => {
val df = spark.read.parquet(path)
df.transform(processData).write.parquet(path+"_processed")
})
现在它在集群中使用了更多的资源,我仍然在尝试理解它是如何工作的,以及如何微调这里的并行处理
1.如果我使用ForkJoinPool
增加默认的scala并行度到更高的数值,会不会导致更多的线程在驱动端产生,并将处于锁定状态等待foreach
函数完成,最终杀死驱动?
1.它如何影响像EventLoggingListnener
这样的集中式Spark,因为多个 Dataframe 是并行处理的,所以EventLoggingListnener
需要处理更多的事件流入。
1.要实现最佳资源利用率,我需要考虑哪些参数。
1.任何其他方法
我可以通过任何资源来了解这种扩展,这将非常有帮助
2条答案
按热度按时间dfty9e191#
速度慢的原因是Spark非常擅长并行计算存储在一个大 Dataframe 中的大量数据。它在处理大量的 Dataframe 方面非常糟糕。2它将使用所有的执行器在一个 Dataframe 上开始计算(即使它们并不都需要),并等待它完成后再开始下一个。这会导致大量的非活动处理器。这很糟糕,但"这不是Spark设计的目的。
我有一个诀窍给你。可能需要改进一点,但你会有想法。下面是我会做的。从一个路径列表中,我会提取parquet文件的所有模式,并创建一个新的大模式来收集所有列。然后,我将要求spark读取使用此模式的所有parquet文件(不存在的列将被自动设置为空)。然后,我将联合所有 Dataframe ,并在这个大 Dataframe 上执行转换,最后使用
partitionBy
将 Dataframe 存储在单独的文件中,同时仍然并行地进行所有的工作。它看起来是这样的。然而,我不推荐在大量的 Dataframe 上使用
unionAll
,因为spark对执行计划的分析,它在处理大量的 Dataframe 时会非常慢,我会使用RDD版本,尽管它更冗长。看看我处理过的目录,我得到了这个:
2ic8powd2#
你应该考虑一些变量,最重要的是:CPU内核,每个DF的大小和一点未来的使用。建议是决定每个DF的优先级处理。你可以使用公平配置,但这是不够的,并行处理可能会占用你的集群的一大部分。你必须分配优先级的DF和使用未来池来控制并行作业的数量运行在你的应用程序。