我有许多包含数百万行格式的文件:
id, created_date, some_value_a, some_value_b, some_value_c
这种重新分区的方式非常慢,为我创建了超过百万个~500b的小文件:
rdd_df = rdd.toDF(["id", "created_time", "a", "b", "c"])
rdd_df.write.partitionBy("id").csv("output")
我想实现输出文件,其中每个文件包含像10000个唯一的ID和它们的所有行。
我怎么能做到这样?
2条答案
按热度按时间nhjlsmyf1#
你需要这样的东西:
或者老实说-让作业决定分区的数量,而不是重新分区。从理论上讲,这应该更快:
ovfsdjhp2#
可以通过添加随机salt键来重新分区。
主要概念是
each partition will be written as 1 file
. 因此,您必须将所需的行按repartition(numPart,col("id"),rand)
.前4-5个操作只是计算每个文件需要多少个分区才能获得近10000个ID。
假设每个分区有10000个ID进行计算
角盒:如果一个
id
行太多,不适合上面计算的分区大小。因此,我们根据出现的id的最大计数来计算分区的数量
以两个noofpartitons中的最小值为例
rand
因此,我们可以在一个分区中引入多个ID注意:虽然这会给你更大的文件和每个文件将包含一组唯一的ID肯定。但这涉及
shuffling
,因此您的操作实际上可能比您所提到的代码慢。