scala 合并降低了整个级的平行度(Spark)

ktca8awb  于 2023-05-29  发布在  Scala
关注(0)|答案(1)|浏览(172)

有时Spark会以低效的方式“优化”dataframe计划。考虑Spark 2.1中的以下示例(也可以在Spark 1.6中复制):

val df = sparkContext.parallelize((1 to 500).map(i=> scala.util.Random.nextDouble),100).toDF("value")

val expensiveUDF = udf((d:Double) => {Thread.sleep(100);d})

val df_result = df
.withColumn("udfResult",expensiveUDF($"value"))

df_result
.coalesce(1)
.saveAsTable(tablename)

在这个例子中,我想在一个昂贵的 Dataframe 转换之后写一个文件(这只是一个演示问题的例子)。Spark将coalesce(1)向上移动,这样UDF只应用于包含1个分区的dataframe,从而破坏了并行性(有趣的是repartition(1)没有这样做)。
一般来说,当我想在转换的某个部分增加并行性,但之后减少并行性时,就会发生这种行为。
我发现了一个解决方法,它包括缓存 Dataframe ,然后触发 Dataframe 的完整评估:

val df = sparkContext.parallelize((1 to 500).map(i=> scala.util.Random.nextDouble),100).toDF("value")

val expensiveUDF = udf((d:Double) => {Thread.sleep(100);d})

val df_result = df
.withColumn("udfResult",expensiveUDF($"value"))
.cache

df_result.rdd.count // trigger computation

df_result
.coalesce(1)
.saveAsTable(tablename)

我的问题是:在这种情况下,有没有其他方法可以告诉Spark不要降低并行度?

csbfibhn

csbfibhn1#

其实并不是因为SparkSQL的优化,SparkSQL并没有改变Coalesce运算符的位置,如执行的计划所示:

Coalesce 1
+- *Project [value#2, UDF(value#2) AS udfResult#11]
   +- *SerializeFromObject [input[0, double, false] AS value#2]
      +- Scan ExternalRDDScan[obj#1]

我引用coalesce API's description的一段话:
注:本段由jira SPARK-19399添加。所以它不应该在2.0的API中找到。
然而,如果你正在做一个激烈的合并,例如。到numPartitions = 1,这可能会导致您的计算发生在比您喜欢的更少的节点上(例如在numPartitions = 1的情况下为一个节点)。为了避免这种情况,您可以调用重新分区。这将增加一个shuffle步骤,但意味着当前的上游分区将并行执行(无论当前分区是什么)。
API并不执行shuffle,但会导致前一个RDD和当前RDD之间的狭窄依赖性。由于RDD是惰性计算,因此计算实际上是使用合并的分区完成的。
为了防止它,你应该使用repartition API。

相关问题