如何决定输入数据大小和集群资源所需的分区数?

bvjxkvbb  于 2021-05-30  发布在  Hadoop
关注(0)|答案(3)|浏览(356)

我的用例如下所述。
使用sparkcontext.textfile(输入路径)从本地文件系统读取输入数据。
使用rdd.coalesce(numberofpartitions)将输入数据(8000万条记录)分区,然后将其提交给mapper/reducer函数。如果不对输入数据使用coalesce()或repartition(),spark执行速度非常慢,并且会失败,并出现内存不足异常。
我在这里面临的问题是决定应用于输入数据的分区数。每次输入的数据大小都不同,硬编码一个特定的值不是一个选项。spark只有在对输入数据应用某种最佳划分时才能表现得很好,我必须对其进行大量迭代(反复试验)。这在生产环境中不是一个选项。
我的问题是:是否有一个经验法则来根据输入数据大小和可用的集群资源(执行器、核心等)来决定所需的分区数?如果是,请给我指那个方向。非常感谢您的帮助。
我在Yarn上使用spark 1.0。
谢谢,阿格

r8xiu3jd

r8xiu3jd1#

spark官方文档中关于调优spark的两个注解:
1-通常,我们建议集群中每个cpu核心执行2-3个任务。
2-spark可以有效地支持最短为200ms的任务,因为它跨多个任务重用一个executor jvm,而且它具有较低的任务启动成本,因此您可以安全地将并行级别提高到集群中的核心数以上。
这是两条tumb规则,可以帮助您估计分区的数量和大小。所以,最好有小任务(可以在100毫秒内完成)。

vnjpjtjt

vnjpjtjt2#

确定分区的数量有点棘手。默认情况下,spark将尝试推断出合理数量的分区。注意:如果您正在使用textfile方法压缩文本,那么spark将禁用拆分,然后您将需要重新分区(听起来这可能是正在发生的事情?)。在使用sc.textfile加载非压缩数据时,还可以指定最小分区数(例如sc.textfile(path,minpartitions))。
coalesce函数仅用于减少分区数,因此应考虑使用repartition()函数。
至于选择一个“好”的数字,您通常希望至少与并行执行器的数量相同。已经存在一些逻辑来尝试确定“良好”的并行量,您可以通过调用sc.defaultparallelism来获得这个值

eulz3vhy

eulz3vhy3#

我假设您知道要进入的集群的大小,那么您基本上可以尝试将数据划分为该集群的若干倍&使用rangepartitioner对数据进行大致相等的划分。动态分区是根据文件系统上的块数来创建的&因此,调度如此多任务的任务开销很大程度上会降低性能。

import org.apache.spark.RangePartitioner;
var file=sc.textFile("<my local path>")
var partitionedFile=file.map(x=>(x,1))
var data= partitionedFile.partitionBy(new RangePartitioner(3, partitionedFile))

相关问题