scala 未调用SparkAppHandle监听程序

km0tfn4u  于 2023-01-17  发布在  Scala
关注(0)|答案(1)|浏览(128)

我正在尝试使用play框架提交一个spark 2.3的scala kubernetes集群任务。
我也尝试过作为一个简单的scala程序而不使用play framework。
作业正在提交到k8集群,但未调用stateChanged和infoChanged。我还希望能够获取handle.getAppId。
我使用spark submit提交作业,如here所述

$ bin/spark-submit \
    --master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \
    --deploy-mode cluster \
    --name spark-pi \
    --class org.apache.spark.examples.SparkPi \
    --conf spark.executor.instances=5 \
    --conf spark.kubernetes.container.image=<spark-image> \
    local:///path/to/examples.jar

下面是该作业的代码:

def index = Action {
    try {
       val spark = new SparkLauncher()
        .setMaster("my k8 apiserver host")
        .setVerbose(true)
        .addSparkArg("--verbose")
        .setMainClass("myClass")
        .setAppResource("hdfs://server/inputs/my.jar")
        .setConf("spark.app.name","myapp")
        .setConf("spark.executor.instances","5")
        .setConf("spark.kubernetes.container.image","mydockerimage")
        .setDeployMode("cluster")
        .startApplication(new SparkAppHandle.Listener(){

          def infoChanged(handle: SparkAppHandle): Unit = {
            System.out.println("Spark App Id [" 
              + handle.getAppId 
              + "] Info Changed.  State [" 
              + handle.getState + "]")
          }

          def stateChanged(handle: SparkAppHandle): Unit = {
            System.out.println("Spark App Id [" 
              + handle.getAppId 
              + "] State Changed. State [" 
              + handle.getState + "]")
            if (handle.getState.toString == "FINISHED") System.exit(0)
          }    
      } )

    Ok(spark.getState().toString())

    } catch {
      case NonFatal(e)=>{
        println("failed with exception: " + e)
      }
    }    
  Ok
}
mnemlml8

mnemlml81#

Spark Launcher架构概述

SparkLauncher允许以编程方式运行spark-submit命令。它在JVM中作为一个单独的子线程运行。您需要在客户端主函数中等待,直到驱动程序在K8中启动,并且您获得侦听器回调。否则,JVM主线程会终止客户端,并且不报告任何内容。

-----------------------                       -----------------------
|      User App       |     spark-submit      |      Spark App      |
|                     |  -------------------> |                     |
|         ------------|                       |-------------        |
|         |           |        hello          |            |        |
|         | L. Server |<----------------------| L. Backend |        |
|         |           |                       |            |        |
|         -------------                       -----------------------
|               |     |                              ^
|               v     |                              |
|        -------------|                              |
|        |            |      <per-app channel>       |
|        | App Handle |<------------------------------
|        |            |
-----------------------

溶液
我添加了一个j.u.c.CountDownLatch实现,它阻止主线程在到达appState.isFinal之前退出。

object SparkLauncher {
  def main(args: Array[String]) {

    import java.util.concurrent.CountDownLatch
    val countDownLatch = new CountDownLatch(1)

    val launcher = new SparkLauncher()
      .setMaster("k8s://http://127.0.0.1:8001")
      .setAppResource("local:/{PATH}/spark-examples_2.11-2.3.0.jar")
      .setConf("spark.app.name","spark-pi")
      .setMainClass("org.apache.spark.examples.SparkPi")
      .setConf("spark.executor.instances","5")
      .setConf("spark.kubernetes.container.image","spark:spark-docker")
      .setConf("spark.kubernetes.driver.pod.name","spark-pi-driver")
      .setDeployMode("cluster")
      .startApplication(new SparkAppHandle.Listener() {
        def infoChanged(handle: SparkAppHandle): Unit = {
        }

        def stateChanged(handle: SparkAppHandle): Unit = {
          val appState = handle.getState()
          println(s"Spark App Id [${handle.getAppId}] State Changed. State [${handle.getState}]")

          if (appState != null && appState.isFinal) {
            countDownLatch.countDown //waiting until spark driver exits
          }
        }
      })

    countDownLatch.await()
  }
}

相关问题