在spark/scala中使用foreach时的执行流程

j9per5c4  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(518)

我对集群上的执行流有一个奇怪的问题。
方法a调用方法b,方法b在foreach中调用方法c
执行流程应该是

Method A --> Method B --> Method C

但它的工作原理是这样的:

1) Method A --> Method B (skips Method C) and continues to rest of Method B.
2) Method C is executed later separately.

因为流量不正确, accum1.value 在方法b中显示为 blank/null .


**CLASS A::METHOD A:**

object TakeDFs {

    def takeDFs(df: DataFrame): Unit = {
        println("---------------- takeNettedDFs::START ---------------- ")

        for(i <- 0 until bySecurityArray.length) {
            allocProcessDF = bySecurityArray(i).toDF()
            ....

            //WORKS
            AllocOneProcess.getAllocOneDFs(allocProcessDF)

            }
        println("---------------- takeNettedDFs::END ---------------- ")

    }
}

**CLASS B::METHOD B:**

object AllocOneProcess {

    def getAllocOneDFs(df: DataFrame): Unit = {
        println("---------------- getAllocOneDFs::START ---------------- ")

        df.coalesce(1).sort($"PRIORITY" asc).foreach( {
        row => AllocOneTest.allocProcessTest(row)
        })

        println("------------- getAllocOneDFs::accum1.value -------------" + accum1.value)

        println("---------------- getAllocOneDFs::END ---------------- ")

    }
}

**CLASS C::METHOD C:**

object AllocOneTest {

    def allocProcessTest(row: Row): Unit =  {
        println("---------------- AllocOneTest::allocProcessTest::START ---------------- ")

        accum1.add(RegRptPilotConstants.PairProcessCaseClass(row(0).asInstanceOf[String], row(1).asInstanceOf[String], row(2).asInstanceOf[String]))

        println("---------------- AllocationOneTest::allocProcessTest::END ---------------- ")

    }
}

**CLASS D::**

object RegRptPilotConstants {
    var pairedOneSeq = Seq[PairProcessCaseClass]()
    val accum1 = new ProcessAccumulator[ProcessCaseClass]()

}
aiqt4smr

aiqt4smr1#

对于上面的代码,foreach是触发spark dag以执行整个流的操作。动作,即上面场景中的foreach在每个分区上同时并行执行/调用,从而最终调用foreach内部的方法。
流程:方法a-->方法b-->方法c,方法c,方法c。。。

df.coalesce(1).sort($"PRIORITY" asc).foreach( {
    row => AllocOneTest.allocProcessTest(row)
})

参考文献:https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions

相关问题