如何确保flink作业已完成执行,然后执行某些任务

z3yyvxxp  于 2021-06-25  发布在  Flink
关注(0)|答案(2)|浏览(406)

我想在flink作业完成后执行一些任务,我在intellij中运行代码时没有任何问题,但是在shell文件中运行flink jar时有一些问题。我使用下面的行来确保flink程序的执行是完整的

//start the execution

JobExecutionResult jobExecutionResult = envrionment.execute(" Started the execution ");

 is_job_finished = jobExecutionResult.isJobExecutionResult();

我不确定,上面的检查是否正确?
然后我使用上面的变量在下面的方法来执行一些任务

if(print_mode && is_job_finished){

        System.out.println(" \n \n -- System related  variables  -- \n");

        System.out.println(" Stream_join Window length = " + WindowLength_join__ms + " milliseconds");
        System.out.println(" Input rate for stream RR  = " + input_rate_rr_S + " events/second");
        System.out.println("Stream RR Runtime = " + Stream_RR_RunTime_S + " seconds");
        System.out.println(" # raw events in stream RR  = " + Total_Number_Of_Events_in_RR + "\n");

}

有什么建议吗?

1l5u6lss

1l5u6lss1#

您可以将作业侦听器注册到执行环境。
例如

env.registerJobListener(new JobListener {
      //Callback on job submission.
      override def onJobSubmitted(jobClient: JobClient, throwable: Throwable): Unit = {
        if (throwable == null) {
          log.info("SUBMIT SUCCESS")
        } else {
          log.info("FAIL")
        }
      }
    //Callback on job execution finished, successfully or unsuccessfully.
      override def onJobExecuted(jobExecutionResult: JobExecutionResult, throwable: Throwable): Unit = {

        if (throwable == null) {
          log.info("SUCCESS")
        } else {
          log.info("FAIL")
        }
      }
    })
n8ghc7c1

n8ghc7c12#

将joblistener注册到您的streamexecutionenvironment。

相关问题