pyspark 什么是spark DAG中的shufflequerage?

mwkjh3gx  于 2022-12-17  发布在  Spark
关注(0)|答案(2)|浏览(174)

我在spark DAG中看到的shufflequerystage框是什么?它与spark stages中的excahnge框有何不同?

g6ll5ycj

g6ll5ycj1#

这里已经有了一个很好的答案,但这只是为了通过查看源代码来为您提供一些关于shufflequerystage实际上是什么的更多信息。

什么是随机查询阶段?

如果我们查看Spark的ShuffleQueryStageExec案例类的源代码,我们会看到以下内容:

case class ShuffleQueryStageExec(
    override val id: Int,
    override val plan: SparkPlan,
    override val _canonicalized: SparkPlan) extends QueryStageExec {
...
}

那么ShuffleQueryStageExec extends QueryStageExec。让我们看看QueryStageExec。代码注解很有启发性:
查询阶段是查询计划的独立子图。查询阶段在继续查询计划的其他运算符之前实体化其输出。实体化输出的数据统计信息可用于优化后续查询阶段。
有两种查询阶段:

  1. Shuffle查询阶段:这个阶段将其输出物化为shuffle文件,Spark启动另一个作业来执行进一步的操作符。
    1.广播查询阶段。该阶段将其输出物化为驱动程序JVM中的数组。Spark在执行进一步的操作符之前广播该数组。
    简而言之,ShuffleQueryStage是整个查询计划的一部分,它的数据统计信息可用于优化后续查询阶段,这是自适应查询执行(AQE)的全部内容。

如何创建这样的随机查询阶段?

为了更好地理解这一切是如何工作的,我们可以试着理解shuffle查询阶段是如何进行的,AdaptiveSparkPlanExec case类是一个有趣的地方。
有很多动作(collect、take、tail、execute...)可以触发withFinalPlanUpdate函数,而withFinalPlanUpdate函数又会触发getFinalPhysicalPlan函数,在这个函数中,createQueryStages函数被调用,这就是它的有趣之处。
createQueryStages函数是遍历整个计划树的递归函数,它看起来有点像这样:

private def createQueryStages(plan: SparkPlan): CreateStageResult = plan match {
    case e: Exchange =>
      // First have a quick check in the `stageCache` without having to traverse down the node.
      context.stageCache.get(e.canonicalized) match {
        case Some(existingStage) if conf.exchangeReuseEnabled =>
          ...

        case _ =>
          val result = createQueryStages(e.child)
          val newPlan = e.withNewChildren(Seq(result.newPlan)).asInstanceOf[Exchange]
          // Create a query stage only when all the child query stages are ready.
          if (result.allChildStagesMaterialized) {
            var newStage = newQueryStage(newPlan)
            ...
      }

所以你看,如果我们跳到一个已经执行过的Exchange上,并且我们想重用它,我们就这样做,但是如果不是这样,我们将创建一个新的计划并调用newQueryStage函数。
故事到此结束,newQueryStage函数如下所示:

private def newQueryStage(e: Exchange): QueryStageExec = {
    val optimizedPlan = optimizeQueryStage(e.child, isFinalStage = false)
    val queryStage = e match {
      case s: ShuffleExchangeLike =>
        ...
        ShuffleQueryStageExec(currentStageId, newShuffle, s.canonicalized)
      case b: BroadcastExchangeLike =>
        ...
        BroadcastQueryStageExec(currentStageId, newBroadcast, b.canonicalized)
    }
    ...
  }

因此,我们看到ShuffleQueryStageExec正在生成!对于每个Exchange,如果尚未发生,或者您没有重用交换,AQE将添加一个ShuffleQueryStageExecBroadcastQueryStageExec
希望这能为这是什么带来更多的见解:)

fxnxkyjh

fxnxkyjh2#

shufflequerystage连接到AQE,它们通过交换被添加到每个阶段之后,并且用于在每个阶段之后物化结果,以及基于统计优化剩余计划。
因此,我的简短回答是:
交换-在这里您的数据被打乱
Shufflequerystage -为AQE目的添加,以使用运行时统计信息和重新优化计划
在下面的例子中,我试图展示这种机制
下面是示例代码:

import org.apache.spark.sql.functions._

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.adaptive.enabled", true)

val input = spark.read
  .format("csv")
  .option("header", "true")
  .load(
    "dbfs:/FileStore/shared_uploads/**@gmail.com/city_temperature.csv"
  )
val dataForInput2 = Seq(
  ("Algeria", "3"),
  ("Germany", "3"),
  ("France", "5"),
  ("Poland", "7"),
  ("test55", "86")
)
val input2 = dataForInput2
  .toDF("Country", "Value")
  .withColumn("test", lit("test"))
val joinedDfs = input.join(input2, Seq("Country"))
val finalResult =
  joinedDfs.filter(input("Country") === "Poland").repartition(200)
finalResult.show

我正在从文件中阅读数据,但您可以用代码中创建的小df替换它,因为我添加了一些行来禁用广播。我添加了一些withColumn和repartition,使其更有趣
首先,让我们看看禁用AQE的计划:

== Physical Plan ==
CollectLimit (11)
+- Exchange (10)
   +- * Project (9)
      +- * SortMergeJoin Inner (8)
         :- Sort (4)
         :  +- Exchange (3)
         :     +- * Filter (2)
         :        +- Scan csv  (1)
         +- Sort (7)
            +- Exchange (6)
               +- LocalTableScan (5)

现在启用AQE

== Physical Plan ==
AdaptiveSparkPlan (25)
+- == Final Plan ==
   CollectLimit (16)
   +- ShuffleQueryStage (15), Statistics(sizeInBytes=1447.8 KiB, rowCount=9.27E+3, isRuntime=true)
      +- Exchange (14)
         +- * Project (13)
            +- * SortMergeJoin Inner (12)
               :- Sort (6)
               :  +- AQEShuffleRead (5)
               :     +- ShuffleQueryStage (4), Statistics(sizeInBytes=1158.3 KiB, rowCount=9.27E+3, isRuntime=true)
               :        +- Exchange (3)
               :           +- * Filter (2)
               :              +- Scan csv  (1)
               +- Sort (11)
                  +- AQEShuffleRead (10)
                     +- ShuffleQueryStage (9), Statistics(sizeInBytes=56.0 B, rowCount=1, isRuntime=true)
                        +- Exchange (8)
                           +- LocalTableScan (7)

代码是相同的,唯一的区别是AQE,但现在您可以看到ShuffleQueryStage在每次交换后弹出
让我们看一下您的示例中的Dag可视化。
首先,我们来看一下包含join的作业3

然后是job 4,它只是重用先前计算的内容,但添加了额外的第4阶段,ShuffleQueryStage与您的情况类似

相关问题