据我所知,Spark使用lazy evaluation来执行实际的计算。只有当一个操作在一个转换链上被调用时,该转换链才会在执行器上执行。所有这些都发生在driver上,这是一个单线程进程。默认情况下,代码中声明的操作顺序总是被保留。
我的问题是:驱动程序是否通过等待触发的执行完成来执行同步调用,然后再继续下一个代码指令?换句话说:驱动程序调度指令是否阻塞?
下面是一个仅使用Spark API调用的示例:
spark
.read
.schema(mySchema)
.json(myFilePath)
.withColumn("a", col("b") * 2)
.filter(col("c") > 300)
.count()
spark
.read
.schema(mySchema2)
.json(myFilePath2)
.filter(col("d") < 100)
.count()
在这里,默认情况下计划并执行两个操作,以确保它们的声明顺序相同。
Scala语句的另一个示例:
val df1 = spark
.read
.schema(mySchema)
.json(myFilePath)
.withColumn("a", col("b") * 2)
.filter(col("c") > 300)
// no execution happened until here
val df1Count = df1.count() // "count" action triggers the execution
println(s"df1 contains ${df1Count} rows."). // rows are logged correctly
如果df1Count
包含在群集上执行的结果,驱动程序是否等待完成执行后才调用println
语句?
我错过了什么吗?我想知道更多关于它,所以一些官方文档或博客文章会有帮助。
2条答案
按热度按时间gmxoilav1#
是的,驾驶员负责生成逻辑和物理计划。
因此,驱动程序正在“等待”
action
(这里是count
),以创建逻辑计划,并将工作发送给执行器。然后,你的scala代码等待执行器响应,并执行其他代码行。
顺便说一句,这是可能的并行执行代码:请参见https://medium.com/analytics-vidhya/boosting-apache-spark-application-by-running-multiple-parallel-jobs-25d13ee7d2a6:
umuewwlo2#
在您的示例中,
withColumn
和filter
操作都是 transformations,因此它们的求值是延迟的。count
操作是触发这些转换执行的 action。与转换不同,操作是同步执行的。动作和转换在spark文档中讨论(文档的这一部分讨论RDD,但同样适用于 Dataframe )。