如果托管的spark会话死亡,则获得通知

h4cxqtbf  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(279)

我有一个长期运行的python3.6应用程序,它在yarn上托管pyspark2.4.6会话。如果spark会话崩溃/死亡,我希望能够得到通知/调用一个方法,这样我就可以自动重新启动它。
我希望主动地这样做,而不是 Package 对会话的每个调用并捕捉会话已关闭的错误,然后让用户在会话重新启动时等待。

yftpprvb

yftpprvb1#

使用https://stackoverflow.com/a/44084038/328968 首先,您可以创建一个侦听器并将其添加到会话中。当应用程序结束时,可以执行回调以重新启动应用程序。
sparklistener的定义见上述参考答案。

class SparkApplicationEndListener(SparkListener):
    def __init__(self, applicationEndCallback):
        self.applicationEndCallback = applicationEndCallback

    def onApplicationEnd(self, applicationEnd):
        if self.applicationEndCallback != None:
            self.applicationEndCallback(applicationEnd)

######## 

def handleSparkApplicationEnd(app_end):
    print(app_end.toString())
    sparkSession = buildSparkSession()

def buildSparkSession():
    #......
    sparkSession.sparkContext._gateway.start_callback_server()
    listener = SparkApplicationEndListener(handleSparkApplicationEnd)
    sparkSession.sparkContext._jsc.sc().addSparkListener(listener)
luaexgnf

luaexgnf2#

有一个restapi可用于yarn。您可以每隔一段时间查询当前运行的所有应用程序

http://rm-http-address:port/ws/v1/cluster/apps?states=RUNNING

然后检查pyspark会话是否是结果的一部分。

相关问题