好像我错过了Spark中的repartition。AFAIK,你可以用一个键重新分区:df.repartition("key"),在这种情况下,Spark将使用散列分区方法。您可以只设置分区数来重新分区:df.repartition(10),其中Spark将使用循环分区方法。在哪种情况下,如果以循环方式仅使用列号进行重新分区,则循环分区将具有数据不对称,这将需要使用salt来均等地随机化结果?
repartition
df.repartition("key")
df.repartition(10)
bfnvny8b1#
在df.repartition(10)中,你不能有不对称,正如你提到的,spark使用循环分区方法,这样分区就有相同的大小。我们可以检查:
spark.range(100000).repartition(5).explain
== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Exchange RoundRobinPartitioning(5), REPARTITION_BY_NUM, [id=#1380] +- Range (0, 100000, step=1, splits=16)
spark.range(100000).repartition(5).groupBy(spark_partition_id).count
+--------------------+-----+ |SPARK_PARTITION_ID()|count| +--------------------+-----+ | 0|20000| | 1|20000| | 2|20000| | 3|20000| | 4|20000| +--------------------+-----+
如果使用df.repartition("key"),则会发生一些不同的情况:
// let's specify the number of partitions as well spark.range(100000).repartition(5, 'id).explain
== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Exchange hashpartitioning(id#352L, 5), REPARTITION_BY_NUM, [id=#1424] +- Range (0, 100000, step=1, splits=16)
我们试试看:
spark.range(100000).repartition(5, 'id).groupBy(spark_partition_id).count.show
+--------------------+-----+ |SPARK_PARTITION_ID()|count| +--------------------+-----+ | 0|20128| | 1|20183| | 2|19943| | 3|19940| | 4|19806| +--------------------+-----+
列的每个元素都经过散列处理,散列值在分区之间拆分。因此,分区的大小相似,但并不完全相同。**但是,**具有相同键的两行最终必然位于同一个分区中。因此,如果您的键不对称(一个或多个特定键在 Dataframe 中过度表示),您的分区也将不对称:
spark.range(100000) .withColumn("key", when('id < 1000, 'id).otherwise(lit(0))) .repartition(5, 'key) .groupBy(spark_partition_id).count.show
+--------------------+-----+ |SPARK_PARTITION_ID()|count| +--------------------+-----+ | 0|99211| | 1| 196| | 2| 190| | 3| 200| | 4| 203| +--------------------+-----+
1条答案
按热度按时间bfnvny8b1#
在
df.repartition(10)
中,你不能有不对称,正如你提到的,spark使用循环分区方法,这样分区就有相同的大小。我们可以检查:
如果使用
df.repartition("key")
,则会发生一些不同的情况:我们试试看:
列的每个元素都经过散列处理,散列值在分区之间拆分。因此,分区的大小相似,但并不完全相同。**但是,**具有相同键的两行最终必然位于同一个分区中。因此,如果您的键不对称(一个或多个特定键在 Dataframe 中过度表示),您的分区也将不对称: