Spark驱动程序同步执行

khbbv19g  于 2022-11-16  发布在  Apache
关注(0)|答案(2)|浏览(182)

据我所知,Spark使用lazy evaluation来执行实际的计算。只有当一个操作在一个转换链上被调用时,该转换链才会在执行器上执行。所有这些都发生在driver上,这是一个单线程进程。默认情况下,代码中声明的操作顺序总是被保留。
我的问题是:驱动程序是否通过等待触发的执行完成来执行同步调用,然后再继续下一个代码指令?换句话说:驱动程序调度指令是否阻塞?
下面是一个仅使用Spark API调用的示例:

spark
    .read
    .schema(mySchema)
    .json(myFilePath)
    .withColumn("a", col("b") * 2)
    .filter(col("c") > 300)
    .count()

spark
    .read
    .schema(mySchema2)
    .json(myFilePath2)
    .filter(col("d") < 100)
    .count()

在这里,默认情况下计划并执行两个操作,以确保它们的声明顺序相同。
Scala语句的另一个示例:

val df1 = spark
    .read
    .schema(mySchema)
    .json(myFilePath)
    .withColumn("a", col("b") * 2)
    .filter(col("c") > 300)

// no execution happened until here

val df1Count = df1.count()    // "count" action triggers the execution

println(s"df1 contains ${df1Count} rows.").  // rows are logged correctly

如果df1Count包含在群集上执行的结果,驱动程序是否等待完成执行后才调用println语句?
我错过了什么吗?我想知道更多关于它,所以一些官方文档或博客文章会有帮助。

gmxoilav

gmxoilav1#

是的,驾驶员负责生成逻辑和物理计划。
因此,驱动程序正在“等待”action(这里是count),以创建逻辑计划,并将工作发送给执行器。
然后,你的scala代码等待执行器响应,并执行其他代码行。
顺便说一句,这是可能的并行执行代码:请参见https://medium.com/analytics-vidhya/boosting-apache-spark-application-by-running-multiple-parallel-jobs-25d13ee7d2a6

object ParallelProcessing {

  val queries: List[(String, String)] = List(
    ("SELECT * FROM ABC", "output1"), 
    ("SELECT * FROM XYZ", "output2")
  )

  // Just use parallel collection instead of futures, that's it
  queries.par foreach { 
    case (query, path) =>
      val dataPath = s"${pathPrefix}/{path}"
      executeAndSave(query, dataPath)
  }
  
  def executeAndSave(query: String, dataPath: String)(implicit context: Context): Unit = {
    println(s"$query starts")
    context.spark.sql(query).write.mode("overwrite").parquet(dataPath)
    println(s"$query completes")
  }
  
}
umuewwlo

umuewwlo2#

在您的示例中,withColumnfilter操作都是 transformations,因此它们的求值是延迟的。count操作是触发这些转换执行的 action。与转换不同,操作是同步执行的。
动作和转换在spark文档中讨论(文档的这一部分讨论RDD,但同样适用于 Dataframe )。

相关问题