当使用spark将文件写入hdfs时,在不使用分区时速度非常快。相反,当我使用分区来写文件时,写延迟增加了24倍。
对于同一个文件,不使用分区进行写入大约需要600毫秒,使用分区id进行写入(正好会生成1.000个分区,因为文件中有1.000个id)大约需要14秒。
你们中的一些人有过编写分区文件需要很长时间的相同经历吗?造成这种情况的根本原因是什么?可能spark需要为每个分区创建1000个文件夹和文件?你知道怎么加速吗?
val myRdd = streamedRdd.map { case ((id, metric, time), value) => Record(id, metric, getEpoch(time), time, value) }
val df = myRdd.toDF
df.write.mode(SaveMode.Append)
.partitionBy("id")
.parquet(path)
1条答案
按热度按时间vmpqdwk31#
spark执行器与HDF通信以写入它们拥有的数据,这取决于分区后数据在集群中的分布方式。
显然,对于较小的数据块,与按顺序写入整个文件相比,建立从多个executor节点到hdfs的连接并进行写入所需的时间会更多。
如何避免这种情况:
默认情况下,spark使用hash partitioner对数据进行分区(哈希键和哈希相同的键到达同一节点)尝试指定range partitioner,请查找下面的示例片段:
下面的代码段使用hash分区器yourrdd.groupbykey().saveastextfile(“hdfs路径”);
下面的代码段使用我们的自定义范围分区器,它创建了8个分区,如中所述
RangePartitioner(8, yourRdd)
通过8个连接进行写作比通过1000个连接进行写作更好。这也是要写入的数据和创建的分区数量之间的权衡。