如何从java中知道flink作业的状态?

dgjrabp2  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(543)

我有一个作业正在运行,我想只使用一次恢复重试,因为在此期间,flink重新启动没有触发我有一个线程尝试解决问题,然后当问题解决后,flink将重新启动,但有时线程需要更长的时间来修复问题,并触发重新启动策略,由于问题仍然失败,然后作业停止,但线程可能还有另一个迭代,然后应用程序永远不会死,因为我将它作为jar应用程序运行。所以,我的问题是:
是否还有其他方法可以从java代码中知道作业的状态?类似于(jobstatus.canceled==true)。
提前谢谢!谨致问候

fhity93d

fhity93d1#

非常感谢菲利佩。这正是我所需要的,多亏了你,一切都完成了。我在这里共享代码,以防其他人需要。
让听众做好准备

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(...);
final AtomicReference<JobID> jobIdReference = new AtomicReference<>();
//Environment configurations
env.registerJobListener(new JobListener() {
    @Override
    public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) {
        assert jobClient != null;
        jobIdReference.set(jobClient.getJobID());
        jobClient = jobClient /*jobClient static public object in the main class*/;
    }@Override
    public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) {
        assert jobExecutionResult != null;
        jobExecutionResult.notify();
    }
});

使用代码:

Preconditions.checkNotNull(jobClient);
    final String status = jobClient.getJobStatus().get().name();
    if (status.equals(JobStatus.FAILED.name())) System.exit(1);

相关问题