我有一个长期运行的python3.6应用程序,它在yarn上托管pyspark2.4.6会话。如果spark会话崩溃/死亡,我希望能够得到通知/调用一个方法,这样我就可以自动重新启动它。我希望主动地这样做,而不是 Package 对会话的每个调用并捕捉会话已关闭的错误,然后让用户在会话重新启动时等待。
yftpprvb1#
使用https://stackoverflow.com/a/44084038/328968 首先,您可以创建一个侦听器并将其添加到会话中。当应用程序结束时,可以执行回调以重新启动应用程序。sparklistener的定义见上述参考答案。
class SparkApplicationEndListener(SparkListener): def __init__(self, applicationEndCallback): self.applicationEndCallback = applicationEndCallback def onApplicationEnd(self, applicationEnd): if self.applicationEndCallback != None: self.applicationEndCallback(applicationEnd) ######## def handleSparkApplicationEnd(app_end): print(app_end.toString()) sparkSession = buildSparkSession() def buildSparkSession(): #...... sparkSession.sparkContext._gateway.start_callback_server() listener = SparkApplicationEndListener(handleSparkApplicationEnd) sparkSession.sparkContext._jsc.sc().addSparkListener(listener)
luaexgnf2#
有一个restapi可用于yarn。您可以每隔一段时间查询当前运行的所有应用程序
http://rm-http-address:port/ws/v1/cluster/apps?states=RUNNING
然后检查pyspark会话是否是结果的一部分。
2条答案
按热度按时间yftpprvb1#
使用https://stackoverflow.com/a/44084038/328968 首先,您可以创建一个侦听器并将其添加到会话中。当应用程序结束时,可以执行回调以重新启动应用程序。
sparklistener的定义见上述参考答案。
luaexgnf2#
有一个restapi可用于yarn。您可以每隔一段时间查询当前运行的所有应用程序
然后检查pyspark会话是否是结果的一部分。