有时Spark会以低效的方式“优化”dataframe计划。考虑Spark 2.1中的以下示例(也可以在Spark 1.6中复制):
val df = sparkContext.parallelize((1 to 500).map(i=> scala.util.Random.nextDouble),100).toDF("value")
val expensiveUDF = udf((d:Double) => {Thread.sleep(100);d})
val df_result = df
.withColumn("udfResult",expensiveUDF($"value"))
df_result
.coalesce(1)
.saveAsTable(tablename)
在这个例子中,我想在一个昂贵的 Dataframe 转换之后写一个文件(这只是一个演示问题的例子)。Spark将coalesce(1)
向上移动,这样UDF只应用于包含1个分区的dataframe,从而破坏了并行性(有趣的是repartition(1)
没有这样做)。
一般来说,当我想在转换的某个部分增加并行性,但之后减少并行性时,就会发生这种行为。
我发现了一个解决方法,它包括缓存 Dataframe ,然后触发 Dataframe 的完整评估:
val df = sparkContext.parallelize((1 to 500).map(i=> scala.util.Random.nextDouble),100).toDF("value")
val expensiveUDF = udf((d:Double) => {Thread.sleep(100);d})
val df_result = df
.withColumn("udfResult",expensiveUDF($"value"))
.cache
df_result.rdd.count // trigger computation
df_result
.coalesce(1)
.saveAsTable(tablename)
我的问题是:在这种情况下,有没有其他方法可以告诉Spark不要降低并行度?
1条答案
按热度按时间csbfibhn1#
其实并不是因为SparkSQL的优化,SparkSQL并没有改变Coalesce运算符的位置,如执行的计划所示:
我引用coalesce API's description的一段话:
注:本段由jira SPARK-19399添加。所以它不应该在2.0的API中找到。
然而,如果你正在做一个激烈的合并,例如。到numPartitions = 1,这可能会导致您的计算发生在比您喜欢的更少的节点上(例如在numPartitions = 1的情况下为一个节点)。为了避免这种情况,您可以调用重新分区。这将增加一个shuffle步骤,但意味着当前的上游分区将并行执行(无论当前分区是什么)。
API并不执行shuffle,但会导致前一个RDD和当前RDD之间的狭窄依赖性。由于RDD是惰性计算,因此计算实际上是使用合并的分区完成的。
为了防止它,你应该使用repartition API。