如何按键重新划分rdd,然后将其打包到碎片中?

vwkv1x7d  于 2021-05-19  发布在  Spark
关注(0)|答案(2)|浏览(446)

我有许多包含数百万行格式的文件:

id, created_date, some_value_a, some_value_b, some_value_c

这种重新分区的方式非常慢,为我创建了超过百万个~500b的小文件:

rdd_df = rdd.toDF(["id", "created_time", "a", "b", "c"])
    rdd_df.write.partitionBy("id").csv("output")

我想实现输出文件,其中每个文件包含像10000个唯一的ID和它们的所有行。
我怎么能做到这样?

nhjlsmyf

nhjlsmyf1#

你需要这样的东西:

rdd_df.repartition(*number of partitions you want*).write.csv("output", header = True)

或者老实说-让作业决定分区的数量,而不是重新分区。从理论上讲,这应该更快:

rdd_df.write.csv("output", header = True)
ovfsdjhp

ovfsdjhp2#

可以通过添加随机salt键来重新分区。

val totRows = rdd_df.count

val maxRowsForAnId = rdd_df.groupBy("id").count().agg(max("count"))
val numParts1 = totRows/maxRowsForAnId

val totalUniqueIds = rdd_df.select("id").distinct.count
val numParts2 = totRows/(10000*totalUniqueIds)

val numPart = numParts1.min(numParts2)

rdd_df
 .repartition(numPart,col("id"),rand)
 .csv("output")

主要概念是 each partition will be written as 1 file . 因此,您必须将所需的行按 repartition(numPart,col("id"),rand) .
前4-5个操作只是计算每个文件需要多少个分区才能获得近10000个ID。
假设每个分区有10000个ID进行计算
角盒:如果一个 id 行太多,不适合上面计算的分区大小。
因此,我们根据出现的id的最大计数来计算分区的数量
以两个noofpartitons中的最小值为例 rand 因此,我们可以在一个分区中引入多个ID
注意:虽然这会给你更大的文件和每个文件将包含一组唯一的ID肯定。但这涉及 shuffling ,因此您的操作实际上可能比您所提到的代码慢。

相关问题