如何配置Spark在join或groupby之后调整输出分区的数量?

sdnqo3pr  于 2023-05-23  发布在  Apache
关注(0)|答案(1)|浏览(309)

bounty还有6天到期。回答此问题可获得+300声望奖励。Rinat Veliakhmedov正在寻找典型答案

我知道你可以设置spark.sql.shuffle.partitionsspark.sql.adaptive.advisoryPartitionSizeInBytes。前者不适用于自适应查询执行,而后者由于某种原因只适用于第一次 Shuffle ,之后它只使用默认的分区数量,即#核心。
有没有办法配置AQE来调整分区的数量,使每个分区不超过100MB?

z8dt9xmd

z8dt9xmd1#

不确定您正在使用哪个版本的Spark,但您可以尝试将spark.sql.adaptive.coalescePartitions.minPartitionNum设置为某个值,开始时您可以尝试使用与sql.shuffle.partitions相同的值
我希望通过这个设置,你可以同时拥有这两种功能--小分区的自动合并+aqe对偏斜的处理,但是当有很多事情要做时,它会尝试从spark.sql.adaptive.coalescePartitions.minPartitionNum中保留最小数量的分区
目前,我还没有看到任何其他方法来强制spark动态计算它,以保持分区不大于100 mb。
为什么我认为它可能会改变一些事情:
以下是此参数的说明:

val COALESCE_PARTITIONS_MIN_PARTITION_NUM =
    buildConf("spark.sql.adaptive.coalescePartitions.minPartitionNum")
      .internal()
      .doc("(deprecated) The suggested (not guaranteed) minimum number of shuffle partitions " +
        "after coalescing. If not set, the default value is the default parallelism of the " +
        "Spark cluster. This configuration only has an effect when " +
        s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
        s"'${COALESCE_PARTITIONS_ENABLED.key}' are both true.")
      .version("3.0.0")
      .intConf
      .checkValue(_ > 0, "The minimum number of partitions must be positive.")
      .createOptional

所以它是可选的,现在让我们检查它在哪里使用Spark代码:

// Ideally, this rule should simply coalesce partitions w.r.t. the target size specified by
// ADVISORY_PARTITION_SIZE_IN_BYTES (default 64MB). To avoid perf regression in AQE, this
// rule by default tries to maximize the parallelism and set the target size to
// `total shuffle size / Spark default parallelism`. In case the `Spark default parallelism`
// is too big, this rule also respect the minimum partition size specified by
// COALESCE_PARTITIONS_MIN_PARTITION_SIZE (default 1MB).
// For history reason, this rule also need to support the config
// COALESCE_PARTITIONS_MIN_PARTITION_NUM. We should remove this config in the future.
val minNumPartitions = conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM).getOrElse {
  if (conf.getConf(SQLConf.COALESCE_PARTITIONS_PARALLELISM_FIRST)) {
    // We fall back to Spark default parallelism if the minimum number of coalesced partitions
    // is not set, so to avoid perf regressions compared to no coalescing.
    session.sparkContext.defaultParallelism
  } else {
    // If we don't need to maximize the parallelism, we set `minPartitionNum` to 1, so that
    // the specified advisory partition size will be respected.
    1
  }
}

看起来,当没有设置此参数并且spark.sql.adaptive.coalescePartitions.parallelismFirst设置为true(默认为true)时,Spark将选择默认并行度作为minPartitionNum。可能这就是您看到分区数量等于核心数量的原因
如果我理解正确的话,如果你设置spark.sql.adaptive.coalescePartitions.minPartitionNum,它应该可以做到这一点,并允许你对分区有更多的控制。
如果它没有帮助,或者您期望其他东西,您可以尝试使用其他sql.adaptive参数并检查它们在源代码中的使用情况。
我认为this blog post可能是一个很好的起点

相关问题