pyspark Spark:重新分区和按范围重新分区有什么区别?

kg7wmglp  于 2022-11-01  发布在  Spark
关注(0)|答案(2)|浏览(201)

我浏览了以下文档:https://spark.apache.org/docs/latest/api/python/pyspark.sql.html
上面写着:

  • 对于重新分区:生成 Dataframe 是哈希分区
  • 对于按范围重新分区:生成 Dataframe 是范围分区

而且一个previous question也提到了,但是我还是不明白它们到底有什么不同,选择一个会有什么影响?

更重要的是,如果repartition执行散列分区,则提供列作为其参数会产生什么影响?

vxf3dgd4

vxf3dgd41#

我认为最好通过一些实验来研究其中的区别。

测试 Dataframe

在这个实验中,我使用了以下两个Dataframe(我展示的是Scala中的代码,但其概念与Python API相同):

// Dataframe with one column "value" containing the values ranging from 0 to 1000000
val df = Seq(0 to 1000000: _*).toDF("value")

// Dataframe with one column "value" containing 1000000 the number 0 in addition to the numbers 5000, 10000 and 100000
val df2 = Seq((0 to 1000000).map(_ => 0) :+ 5000 :+ 10000 :+ 100000: _*).toDF("value")

理论

  • repartition在提供了一个或多个列时应用HashPartitioner,在未提供列时应用RoundRobinPartitioner。如果提供了一个或多个列(HashPartitioner),则将对这些值进行散列处理,并通过计算类似于partition = hash(columns) % numberOfPartitions的值来确定分区号。如果未提供列(RoundRobinPartitioner),则数据将平均分布在指定数量的分区中。
  • repartitionByRange将根据列值的 * 范围 * 对数据进行分区。这通常用于连续(非离散)值,如任何类型的数字。请注意,由于性能原因,此方法使用抽样来估计范围。因此,由于抽样可能返回不同的值,因此输出可能不一致。抽样大小可以由配置spark.sql.execution.rangeExchange.sampleSizePerPartition控制。

同样值得一提的是,对于这两种方法,如果没有给出numPartitions,默认情况下,它会将Dataframe数据划分到Spark会话中配置的spark.sql.shuffle.partitions中,并且可以通过Adaptive Query Execution(自Spark 3.x起可用)进行合并。

测试设置

基于给定的Testdata,我总是应用相同的代码:

val testDf = df
// here I will insert the partition logic
    .withColumn("partition", spark_partition_id()) // applying SQL built-in function to determine actual partition
    .groupBy(col("partition"))
    .agg(
      count(col("value")).as("count"),
      min(col("value")).as("min_value"),
      max(col("value")).as("max_value"))
    .orderBy(col("partition"))

testDf.show(false)

测试结果

df.重新分区(4,col(“值”))

正如预期的那样,我们得到了4个分区,并且由于df的值的范围从0到1000000,因此我们看到它们的哈希值将产生分布良好的Dataframe。

+---------+------+---------+---------+
|partition|count |min_value|max_value|
+---------+------+---------+---------+
|0        |249911|12       |1000000  |
|1        |250076|6        |999994   |
|2        |250334|2        |999999   |
|3        |249680|0        |999998   |
+---------+------+---------+---------+

df.按范围重新分区(4,列(“值”))

同样在这个例子中,我们得到了4个分区,但是这次的最小值和最大值清楚地显示了分区内的值的范围。每个分区几乎平均分布了250000个值。

+---------+------+---------+---------+
|partition|count |min_value|max_value|
+---------+------+---------+---------+
|0        |244803|0        |244802   |
|1        |255376|244803   |500178   |
|2        |249777|500179   |749955   |
|3        |250045|749956   |1000000  |
+---------+------+---------+---------+

df2.repartition(4,列(“值”))

现在,我们使用另一个Dataframe df2。在这里,哈希算法只对0、5000、10000或100000值进行哈希。当然,值0的哈希值始终相同,因此所有零都在同一个分区中结束(在本例中为分区3)。另外两个分区只包含一个值。

+---------+-------+---------+---------+
|partition|count  |min_value|max_value|
+---------+-------+---------+---------+
|0        |1      |100000   |100000   |
|1        |1      |10000    |10000    |
|2        |1      |5000     |5000     |
|3        |1000001|0        |0        |
+---------+-------+---------+---------+

df2.重新分区(4)

如果不使用列“value”的内容,repartition方法将以循环方式分发消息。所有分区的数据量几乎相同。

+---------+------+---------+---------+
|partition|count |min_value|max_value|
+---------+------+---------+---------+
|0        |250002|0        |5000     |
|1        |250002|0        |10000    |
|2        |249998|0        |100000   |
|3        |250002|0        |0        |
+---------+------+---------+---------+

(4,col(“数值”))
本例表明,Dataframe df2未针对按范围重新分区进行良好定义,因为几乎所有值都为0。因此,我们最终只有两个分区,而分区0包含全零。

+---------+-------+---------+---------+
|partition|count  |min_value|max_value|
+---------+-------+---------+---------+
|0        |1000001|0        |0        |
|1        |3      |5000     |100000   |
+---------+-------+---------+---------+
sg3maiej

sg3maiej2#

通过使用df.explain,您可以获得有关这些操作的大量信息。
我将使用此DataFrame作为示例:

df = spark.createDataFrame([(i, f"value {i}") for i in range(1, 22, 1)], ["id", "value"])

重新分割

根据是否指定了键表达式(列),分区方法会有所不同。它并不总是如您所说的散列分区。

df.repartition(3).explain(True)

== Parsed Logical Plan ==
Repartition 3, true
+- LogicalRDD [id#0L, value#1], false

== Analyzed Logical Plan ==
id: bigint, value: string
Repartition 3, true
+- LogicalRDD [id#0L, value#1], false

== Optimized Logical Plan ==
Repartition 3, true
+- LogicalRDD [id#0L, value#1], false

== Physical Plan ==
Exchange RoundRobinPartitioning(3)
+- Scan ExistingRDD[id#0L,value#1]

我们可以在生成的物理计划中看到使用了RoundRobinPartitioning
表示一种分区,在该分区中,通过从随机目的分区号开始并以循环方式分布行,将行平均分布在输出分区中.在实现DataFrame.repartition()运算符时使用此分区.
使用按列重新分区表达式时:

df.repartition(3, "id").explain(True)

== Parsed Logical Plan ==
'RepartitionByExpression ['id], 3
+- LogicalRDD [id#0L, value#1], false

== Analyzed Logical Plan ==
id: bigint, value: string
RepartitionByExpression [id#0L], 3
+- LogicalRDD [id#0L, value#1], false

== Optimized Logical Plan ==
RepartitionByExpression [id#0L], 3
+- LogicalRDD [id#0L, value#1], false

== Physical Plan ==
Exchange hashpartitioning(id#0L, 3)
+- Scan ExistingRDD[id#0L,value#1]

现在选择的分区方法是hashpartitioning。在散列分区方法中,为每个键表达式计算Java Object.hashCode,以通过计算模数来确定目标partition_idkey.hashCode % numPartitions .

按范围重新分区

此分割方法会根据分割索引键,建立numPartitions连续且不重叠的值范围。因此,至少需要一个索引键表示式,而且必须是可排序的。

df.repartitionByRange(3, "id").explain(True)

== Parsed Logical Plan ==
'RepartitionByExpression ['id ASC NULLS FIRST], 3
+- LogicalRDD [id#0L, value#1], false

== Analyzed Logical Plan ==
id: bigint, value: string
RepartitionByExpression [id#0L ASC NULLS FIRST], 3
+- LogicalRDD [id#0L, value#1], false

== Optimized Logical Plan ==
RepartitionByExpression [id#0L ASC NULLS FIRST], 3
+- LogicalRDD [id#0L, value#1], false

== Physical Plan ==
Exchange rangepartitioning(id#0L ASC NULLS FIRST, 3)
+- Scan ExistingRDD[id#0L,value#1]

查看生成的物理计划,我们可以看到rangepartitioning与上面描述的两个不同之处在于分区表达式中存在排序子句。当表达式中没有明确指定排序顺序时,默认情况下它使用升序。

一些有趣的链接:

相关问题