Spark保证要求说明

e5nszbig  于 2022-11-16  发布在  Apache
关注(0)|答案(1)|浏览(141)

有人能用一个实际的例子来解释ENSURE_REQUIREMENTS是如何起作用的吗?
阅读对这个题目并不真正清楚。
我看了这里https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala,但不知道它是什么。某种保险的Spark,事情进展顺利?我发现文档晦涩难懂。
你可以参考我的另一个SO问题:在那里我做了实验,但没有得到为什么会发生这种情况的要点。
我的同事也解释不了。

sg3maiej

sg3maiej1#

让我们假设我们想找出天气如何影响游客访问 akka 迪亚国家公园:

scala> spark.sql("SET spark.sql.shuffle.partitions=10")
scala> val ds = spark.sql("SELECT Date, AVG(VisitDuration) AvgVisitDuration FROM visits GROUP BY Date")
scala> ds.createOrReplaceTempView("visit_stats")
scala> val dwv = spark.sql("SELECT /*+ MERGEJOIN(v) */  w.*, v.AvgVisitDuration FROM weather w JOIN visit_stats v ON w.Date = v.Date")
scala> dwv.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [Date#91, MaxTemp#92, AverageTemp#93, MinTemp#94, Precip#95, Conditions#96, AvgVisitDuration#216]
   +- SortMergeJoin [Date#91], [Date#27], Inner
      :- Sort [Date#91 ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(Date#91, 10), ENSURE_REQUIREMENTS, [id=#478]
      :     +- Filter isnotnull(Date#91)
      :        +- FileScan ...
      +- Sort [Date#27 ASC NULLS FIRST], false, 0
         +- HashAggregate(keys=[Date#27], functions=[avg(cast(VisitDuration#31 as double))])
            +- Exchange hashpartitioning(Date#27, 10), ENSURE_REQUIREMENTS, [id=#474]
               +- HashAggregate(keys=[Date#27], functions=[partial_avg(cast(VisitDuration#31 as double))])
                  +- Filter isnotnull(Date#27)
                     +- FileScan ...

值得注意的是,a)Spark决定使用10个分区对两个数据集进行洗牌,以计算平均值并执行连接,b)两种情况下的洗牌原点都是ENSURE_REQUIREMENTS
假设 visits 数据集非常大,因此我们希望提高统计数据计算的并行性,并将其重新分区为更大的数字。

scala> val dvr = dv.repartition(100,col("Date"))
scala> dvr.createOrReplaceTempView("visits_rep")
scala> val ds = spark.sql("SELECT Date, AVG(AvgDuration) AvgVisitDuration FROM visits_rep GROUP BY Date")
scala> ds.createOrReplaceTempView("visit_stats")
scala> val dwv = spark.sql("SELECT /*+ MERGEJOIN(v) */  w.*, v.AvgVisitDuration from weather w join visit_stats v on w.Date = v.Date")
scala> dwv.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [Date#91, MaxTemp#92, AverageTemp#93, MinTemp#94, Precip#95, Conditions#96, AvgVisitDuration#231]
   +- SortMergeJoin [Date#91], [Date#27], Inner
      :- Sort [Date#91 ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(Date#91, 100), ENSURE_REQUIREMENTS, [id=#531]
      :     +- Filter isnotnull(Date#91)
      :        +- FileScan ...
      +- Sort [Date#27 ASC NULLS FIRST], false, 0
         +- HashAggregate(keys=[Date#27], functions=[avg(cast(VisitDuration#31 as double))])
            +- HashAggregate(keys=[Date#27], functions=[partial_avg(cast(VisitDuration#31 as double))])
               +- Exchange hashpartitioning(Date#27, 100), REPARTITION_BY_NUM, [id=#524]
                  +- Filter isnotnull(Date#27)
                     +- FileScan ...

这里,REPARTITION_BY_NUM shuffle origin要求有100个分区,所以Spark优化了另一个ENSURE_REQUIREMENTS,origin也使用了100个分区,这样就不需要再进行一次shuffle了。
这只是一个简单的例子,但我相信Spark可以对包含ENSURE_REQUIREMENTS原点的shuffle的DAG应用许多其他优化。

相关问题