我对集群上的执行流有一个奇怪的问题。
方法a调用方法b,方法b在foreach中调用方法c
执行流程应该是
Method A --> Method B --> Method C
但它的工作原理是这样的:
1) Method A --> Method B (skips Method C) and continues to rest of Method B.
2) Method C is executed later separately.
因为流量不正确, accum1.value
在方法b中显示为 blank/null
.
**CLASS A::METHOD A:**
object TakeDFs {
def takeDFs(df: DataFrame): Unit = {
println("---------------- takeNettedDFs::START ---------------- ")
for(i <- 0 until bySecurityArray.length) {
allocProcessDF = bySecurityArray(i).toDF()
....
//WORKS
AllocOneProcess.getAllocOneDFs(allocProcessDF)
}
println("---------------- takeNettedDFs::END ---------------- ")
}
}
**CLASS B::METHOD B:**
object AllocOneProcess {
def getAllocOneDFs(df: DataFrame): Unit = {
println("---------------- getAllocOneDFs::START ---------------- ")
df.coalesce(1).sort($"PRIORITY" asc).foreach( {
row => AllocOneTest.allocProcessTest(row)
})
println("------------- getAllocOneDFs::accum1.value -------------" + accum1.value)
println("---------------- getAllocOneDFs::END ---------------- ")
}
}
**CLASS C::METHOD C:**
object AllocOneTest {
def allocProcessTest(row: Row): Unit = {
println("---------------- AllocOneTest::allocProcessTest::START ---------------- ")
accum1.add(RegRptPilotConstants.PairProcessCaseClass(row(0).asInstanceOf[String], row(1).asInstanceOf[String], row(2).asInstanceOf[String]))
println("---------------- AllocationOneTest::allocProcessTest::END ---------------- ")
}
}
**CLASS D::**
object RegRptPilotConstants {
var pairedOneSeq = Seq[PairProcessCaseClass]()
val accum1 = new ProcessAccumulator[ProcessCaseClass]()
}
1条答案
按热度按时间aiqt4smr1#
对于上面的代码,foreach是触发spark dag以执行整个流的操作。动作,即上面场景中的foreach在每个分区上同时并行执行/调用,从而最终调用foreach内部的方法。
流程:方法a-->方法b-->方法c,方法c,方法c。。。
参考文献:https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions