我在读成千上万的文件到rdd通过类似 sc.textFile("/data/*/*/*")
一个问题是,这些文件中的大多数都很小,而其他文件则很大。这会导致任务不平衡,从而导致各种众所周知的问题。
我可以通过读取数据来分解最大的分区吗 sc.textFile("/data/*/*/*", minPartitions=n_files*5)
,在哪里 n_files
是输入文件的数量吗?
在stackoverflow的其他地方, minPartitions
通过hadoop rabit漏洞,并用于 org.apache.hadoop.mapred.TextInputFormat.getSplits
. 我的问题是,这是否实现为首先拆分最大的文件。换句话说,分割策略是一种试图导致大小均匀的分区的策略吗?
我更希望有一个答案能指出在最近版本的spark/hadoop中,分裂策略实际上是在哪里实现的。
1条答案
按热度按时间uyto3xhc1#
没有人发布答案,所以我自己深入研究,并将发布我自己问题的答案:
如果您的输入文件是可拆分的,那么
textFile
如果您使用minpartitions选项,将确实尝试平衡分区大小。分区策略在这里实现,即
getSplits
方法org.apache.hadoop.mapred.TextInputFormat
. 这个分区策略很复杂,并且是通过第一个设置来操作的goalSize
,即输入的总大小除以numSplits
(minPartitions
向下传递以设置numSplits
). 然后,它以这样一种方式拆分文件,即尝试确保每个分区的大小(就其输入的字节大小而言)尽可能接近目标大小/如果您的输入文件不可拆分,则不会进行此拆分:请参阅此处的源代码。