case class ShuffleQueryStageExec(
override val id: Int,
override val plan: SparkPlan,
override val _canonicalized: SparkPlan) extends QueryStageExec {
...
}
private def createQueryStages(plan: SparkPlan): CreateStageResult = plan match {
case e: Exchange =>
// First have a quick check in the `stageCache` without having to traverse down the node.
context.stageCache.get(e.canonicalized) match {
case Some(existingStage) if conf.exchangeReuseEnabled =>
...
case _ =>
val result = createQueryStages(e.child)
val newPlan = e.withNewChildren(Seq(result.newPlan)).asInstanceOf[Exchange]
// Create a query stage only when all the child query stages are ready.
if (result.allChildStagesMaterialized) {
var newStage = newQueryStage(newPlan)
...
}
2条答案
按热度按时间g6ll5ycj1#
这里已经有了一个很好的答案,但这只是为了通过查看源代码来为您提供一些关于
shufflequerystage
实际上是什么的更多信息。什么是随机查询阶段?
如果我们查看Spark的ShuffleQueryStageExec案例类的源代码,我们会看到以下内容:
那么
ShuffleQueryStageExec extends QueryStageExec
。让我们看看QueryStageExec。代码注解很有启发性:查询阶段是查询计划的独立子图。查询阶段在继续查询计划的其他运算符之前实体化其输出。实体化输出的数据统计信息可用于优化后续查询阶段。
有两种查询阶段:
1.广播查询阶段。该阶段将其输出物化为驱动程序JVM中的数组。Spark在执行进一步的操作符之前广播该数组。
简而言之,
ShuffleQueryStage
是整个查询计划的一部分,它的数据统计信息可用于优化后续查询阶段,这是自适应查询执行(AQE)的全部内容。如何创建这样的随机查询阶段?
为了更好地理解这一切是如何工作的,我们可以试着理解shuffle查询阶段是如何进行的,AdaptiveSparkPlanExec case类是一个有趣的地方。
有很多动作(collect、take、tail、execute...)可以触发
withFinalPlanUpdate
函数,而withFinalPlanUpdate
函数又会触发getFinalPhysicalPlan
函数,在这个函数中,createQueryStages函数被调用,这就是它的有趣之处。createQueryStages函数是遍历整个计划树的递归函数,它看起来有点像这样:
所以你看,如果我们跳到一个已经执行过的
Exchange
上,并且我们想重用它,我们就这样做,但是如果不是这样,我们将创建一个新的计划并调用newQueryStage
函数。故事到此结束,
newQueryStage
函数如下所示:因此,我们看到
ShuffleQueryStageExec
正在生成!对于每个Exchange
,如果尚未发生,或者您没有重用交换,AQE将添加一个ShuffleQueryStageExec
或BroadcastQueryStageExec
。希望这能为这是什么带来更多的见解:)
fxnxkyjh2#
shufflequerystage连接到AQE,它们通过交换被添加到每个阶段之后,并且用于在每个阶段之后物化结果,以及基于统计优化剩余计划。
因此,我的简短回答是:
交换-在这里您的数据被打乱
Shufflequerystage -为AQE目的添加,以使用运行时统计信息和重新优化计划
在下面的例子中,我试图展示这种机制
下面是示例代码:
我正在从文件中阅读数据,但您可以用代码中创建的小df替换它,因为我添加了一些行来禁用广播。我添加了一些withColumn和repartition,使其更有趣
首先,让我们看看禁用AQE的计划:
现在启用AQE
代码是相同的,唯一的区别是AQE,但现在您可以看到ShuffleQueryStage在每次交换后弹出
让我们看一下您的示例中的Dag可视化。
首先,我们来看一下包含join的作业3
然后是job 4,它只是重用先前计算的内容,但添加了额外的第4阶段,ShuffleQueryStage与您的情况类似