我知道hadooprestapi通过程序提供了对作业状态的访问。类似地,有没有办法在程序中获得spark作业状态?
velaa5lx1#
spark ui上有一个(n)(几乎)未记录的restapi特性,它提供了有关作业和性能的度量。您可以通过以下方式访问:
http://<driverHost>:<uiPort>/metrics/json/
(uiport默认为4040)
gojuced72#
为java提供答案。在scala中,仅仅使用sparkcontext而不是javasparkcontext几乎是相似的。假设您有一个javasparkcontext:
private final JavaSparkContext sc;
以下代码允许从“作业”和“阶段”选项卡获取所有可用信息:
JavaSparkStatusTracker statusTracker = sc.statusTracker(); for(int jobId: statusTracker.getActiveJobIds()) { SparkJobInfo jobInfo = statusTracker.getJobInfo(jobId); log.info("Job " + jobId + " status is " + jobInfo.status().name()); log.info("Stages status:"); for(int stageId: jobInfo.stageIds()) { SparkStageInfo stageInfo = statusTracker.getStageInfo(stageId); log.info("Stage id=" + stageId + "; name = " + stageInfo.name() + "; completed tasks:" + stageInfo.numCompletedTasks() + "; active tasks: " + stageInfo.numActiveTasks() + "; all tasks: " + stageInfo.numTasks() + "; submission time: " + stageInfo.submissionTime()); } }
不幸的是,其他所有内容都只能从scalaspark上下文访问,因此使用java提供的结构可能会有一些困难。池列表:sc.sc().getallpools()执行器内存状态:sc.sc().getexecutormemorystatus()执行器ID:sc.sc().getexecutorids()存储信息:sc.sc().getrddstorageinfo()... 你可以试着找到更多有用的信息。
yc0p9oo03#
您也可以不使用spark job history server获取spark job状态。您可以使用sparklauncher 2.0.1(甚至spark 1.6版本也可以)从java程序启动spark作业:
SparkAppHandle appHandle = sparkLauncher.startApplication();
也可以将侦听器添加到startapplication()方法:
SparkAppHandle appHandle = sparkLauncher.startApplication(sparkAppListener);
其中listener有两个方法可以通知您作业状态的变化和信息的变化。我使用countdownlatch实现了,它按预期工作。这适用于sparklauncher版本2.0.1,也适用于Yarn簇模式。
... final CountDownLatch countDownLatch = new CountDownLatch(1); SparkAppListener sparkAppListener = new SparkAppListener(countDownLatch); SparkAppHandle appHandle = sparkLauncher.startApplication(sparkAppListener); Thread sparkAppListenerThread = new Thread(sparkAppListener); sparkAppListenerThread.start(); long timeout = 120; countDownLatch.await(timeout, TimeUnit.SECONDS); ... private static class SparkAppListener implements SparkAppHandle.Listener, Runnable { private static final Log log = LogFactory.getLog(SparkAppListener.class); private final CountDownLatch countDownLatch; public SparkAppListener(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } @Override public void stateChanged(SparkAppHandle handle) { String sparkAppId = handle.getAppId(); State appState = handle.getState(); if (sparkAppId != null) { log.info("Spark job with app id: " + sparkAppId + ",\t State changed to: " + appState + " - " + SPARK_STATE_MSG.get(appState)); } else { log.info("Spark job's state changed to: " + appState + " - " + SPARK_STATE_MSG.get(appState)); } if (appState != null && appState.isFinal()) { countDownLatch.countDown(); } } @Override public void infoChanged(SparkAppHandle handle) {} @Override public void run() {} }
hwamh0ep4#
它与restapi不同,但是您可以通过注册 SparkListener 与 SparkContext.addSparkListener . 它是这样的:
SparkListener
SparkContext.addSparkListener
sc.addSparkListener(new SparkListener { override def onStageCompleted(event: SparkListenerStageCompleted) = { if (event.stageInfo.stageId == myStage) { println(s"Stage $myStage is done.") } } })
pb3skfrl5#
有一个(n)(几乎)未记录的RESTAPI特性,它提供了您在spark ui上看到的几乎所有内容:
http://<sparkMasterHost>:<uiPort>/api/v1/...
对于本地安装,您可以从这里开始:
http://localhost:8080/api/v1/applications
您可以在这里找到可能的终点:https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/status/api/v1/apirootresource.scala
5条答案
按热度按时间velaa5lx1#
spark ui上有一个(n)(几乎)未记录的restapi特性,它提供了有关作业和性能的度量。
您可以通过以下方式访问:
(uiport默认为4040)
gojuced72#
为java提供答案。在scala中,仅仅使用sparkcontext而不是javasparkcontext几乎是相似的。
假设您有一个javasparkcontext:
以下代码允许从“作业”和“阶段”选项卡获取所有可用信息:
不幸的是,其他所有内容都只能从scalaspark上下文访问,因此使用java提供的结构可能会有一些困难。
池列表:sc.sc().getallpools()
执行器内存状态:sc.sc().getexecutormemorystatus()
执行器ID:sc.sc().getexecutorids()
存储信息:sc.sc().getrddstorageinfo()
... 你可以试着找到更多有用的信息。
yc0p9oo03#
您也可以不使用spark job history server获取spark job状态。您可以使用sparklauncher 2.0.1(甚至spark 1.6版本也可以)从java程序启动spark作业:
也可以将侦听器添加到startapplication()方法:
其中listener有两个方法可以通知您作业状态的变化和信息的变化。
我使用countdownlatch实现了,它按预期工作。这适用于sparklauncher版本2.0.1,也适用于Yarn簇模式。
hwamh0ep4#
它与restapi不同,但是您可以通过注册
SparkListener
与SparkContext.addSparkListener
. 它是这样的:pb3skfrl5#
有一个(n)(几乎)未记录的RESTAPI特性,它提供了您在spark ui上看到的几乎所有内容:
对于本地安装,您可以从这里开始:
您可以在这里找到可能的终点:https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/status/api/v1/apirootresource.scala