在yarn中启动/停止spark流作业的正确方法是什么?

yebdmbv4  于 2021-06-02  发布在  Hadoop
关注(0)|答案(4)|浏览(1504)

我已经尝试和谷歌搜索了好几个小时,没有运气。
我有一个spark流应用程序,在本地spark集群中运行良好。现在我需要在Cloudera5.4.4上部署它。我需要能够启动它,让它在后台不断运行,并能够停止它。
我试过这个:

$ spark-submit --master yarn-cluster --class MyMain my.jar myArgs

但它只是无休止地打印这些线条。

15/07/28 17:58:18 INFO Client: Application report for application_1438092860895_0012 (state: RUNNING)
15/07/28 17:58:19 INFO Client: Application report for application_1438092860895_0012 (state: RUNNING)

问题1:因为它是一个流应用程序,所以它需要连续运行。那么如何在“后台”模式下运行它呢?我能找到的所有在yarn上提交spark作业的例子似乎都假设应用程序将完成一些工作并终止,因此您希望在前台运行它。但流媒体并非如此。
下一个。。。在这一点上,应用程序似乎没有发挥作用。我想这可能是我的一个bug或配置错误,所以我试着查看日志,看看发生了什么:

$ yarn logs -applicationId application_1438092860895_012

但它告诉我:

/tmp/logs/hdfs/logs/application_1438092860895_0012does not have any log files.

所以问题2:如果应用程序正在运行,为什么它没有日志文件?
所以最终我不得不杀了它:

$ yarn application -kill application_1438092860895_012

这就引出了第3个问题:假设我最终可以启动应用程序并在后台运行,“yarn application-kill”是停止它的首选方法吗?

xyhw6mcr

xyhw6mcr1#

你可以关闭 spark-submit 慰问。写出运行状态时,作业已在后台运行。
日志在应用程序完成后立即可见。在运行时,所有日志都可以在本地的工作节点上直接访问(您可以在yarn resource manager web ui上看到),并在作业完成后聚合到hdfs。 yarn application -kill 可能是阻止spark流媒体应用程序的最佳方法,但它并不完美。最好做一些优雅的关闭来停止所有流接收器和流上下文,但我个人不知道怎么做。

lmyy7pcs

lmyy7pcs2#

我终于想出了一个安全关闭spark流媒体作业的方法。
写一个socket服务器线程等待停止流上下文

package xxx.xxx.xxx

    import java.io.{BufferedReader, InputStreamReader}
    import java.net.{ServerSocket, Socket}

    import org.apache.spark.streaming.StreamingContext

    object KillServer {

      class NetworkService(port: Int, ssc: StreamingContext) extends Runnable {
        val serverSocket = new ServerSocket(port)

        def run() {
          Thread.currentThread().setName("Zhuangdy | Waiting for graceful stop at port " + port)
          while (true) {
            val socket = serverSocket.accept()
            (new Handler(socket, ssc)).run()
          }
        }
      }

      class Handler(socket: Socket, ssc: StreamingContext) extends Runnable {
        def run() {
          val reader = new InputStreamReader(socket.getInputStream)
          val br = new BufferedReader(reader)
          if (br.readLine() == "kill") {
            ssc.stop(true, true)
          }
          br.close();
        }
      }

      def run(port:Int, ssc: StreamingContext): Unit ={
        (new NetworkService(port, ssc)).run
      }
    }

在你的 main 方法中,添加以下代码

ssc.start()
KillServer.run(11212, ssc)
ssc.awaitTermination()

write spark submit将作业提交给yarn,并将输出定向到稍后使用的文件

spark-submit --class "com.Mainclass" \        
            --conf "spark.streaming.stopGracefullyOnShutdown=true" \        
            --master yarn-cluster  --queue "root"  \        
            --deploy-mode cluster \
            --executor-cores 4 --num-executors 8 --executor-memory 3G \
            hdfs:///xxx.jar > output 2>&1 &

最后,安全地关闭spark流作业而不丢失数据或计算结果不会持久(用于优雅地停止流式处理上下文的服务器套接字正在驱动程序上运行,因此您grep步骤3的输出以获取驱动程序addr,并使用echo nc发送socket kill命令)


# !/bin/bash

    driver=`cat output | grep ApplicationMaster | grep -Po '\d+.\d+.\d+.\d+'`
    echo "kill" | nc $driver 11212
    driverid=`yarn application -list 2>&1 | grep ad.Stat | grep -Po 'application_\d+_\d+'`
    yarn application -kill $driverid
pieyvz9o

pieyvz9o3#

最后一个难题是如何以一种优雅的方式停止部署在yarn上的spark流应用程序。停止(或更确切地说是终止)Yarn应用的标准方法是使用命令 yarn application -kill [applicationId] . 这个命令停止spark流应用程序,但这可能发生在批处理的中间。因此,如果作业从kafka读取数据,将处理结果保存在hdfs上,并最终提交kafka偏移量,则在提交偏移量之前停止作业时,hdfs上应该会出现重复的数据。
解决优雅关机问题的第一个尝试是在关机挂钩中调用spark streaming context stop方法。

sys.addShutdownHook {
    streamingContext.stop(stopSparkContext = true, stopGracefully = true)
}

令人失望的是,关闭钩子调用太迟,无法完成启动的批处理,spark应用程序几乎立即被终止。此外,根本不能保证jvm会调用关闭钩子。
在写这篇博文的时候,唯一确定的方法是在yarn上优雅地关闭spark流应用程序,即通知应用程序计划的关闭,然后以编程方式(而不是从shutdown hook)停止流上下文。命令 yarn application -kill 仅当通知的应用程序在定义的超时后未停止时才应作为最后手段使用。
可以使用hdfs上的标记文件(最简单的方法)或使用驱动程序上公开的简单socket/http端点(复杂的方法)通知应用程序计划的关闭。
因为我喜欢kiss原理,下面您可以找到使用标记文件启动/停止spark流应用程序的shell脚本伪代码:

start() {
    hdfs dfs -touchz /path/to/marker/my_job_unique_name
    spark-submit ...
}

stop() {
    hdfs dfs -rm /path/to/marker/my_job_unique_name
    force_kill=true
    application_id=$(yarn application -list | grep -oe "application_[0-9]*_[0-9]*"`)
    for i in `seq 1 10`; do
        application_status=$(yarn application -status ${application_id} | grep "State : \(RUNNING\|ACCEPTED\)")
        if [ -n "$application_status" ]; then
            sleep 60s
        else
            force_kill=false
            break
        fi
    done
    $force_kill && yarn application -kill ${application_id}
}

在spark流应用程序中,后台线程应该监视标记文件,当文件消失时停止上下文调用

streamingContext.stop(stopSparkContext = true, stopGracefully = true)

你也可以参考http://blog.parseconsulting.com/2017/02/how-to-shutdown-spark-streaming-job.html

z4bn682m

z4bn682m4#

你的数据来源是什么?如果它是可靠的,像Kafka直接接收器,纱杀关机应该是罚款。当应用程序重新启动时,它将读取最后一个完整的批处理偏移量。如果数据源不可靠,或者您想自己处理一个优雅的关闭,那么您必须在流上下文上实现某种外部钩子。我也遇到了同样的问题,最后我实现了一个小技巧,在webui中添加了一个新的选项卡,作为停止按钮。

相关问题