从代码中取消apache flink作业

rjzwgtxy  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(491)

我的情况是我想停止/取消代码中的flink作业。这是在我的集成测试中,我向我的flink工作提交一个任务并检查结果。当作业异步运行时,即使测试失败/通过,它也不会停止。我想在考试结束后停止工作。
我试过以下几件事:
找jobmanager演员
获取正在运行的作业
对于每个正在运行的作业,向作业管理器发送一个取消请求
当然,这是因为没有运行,但我不确定jobmanager actorref是错误的还是缺少了其他内容。
我得到的错误是:[flink akka.actor.default-dispatcher-5][akka://flink/user/jobmanager_1]来自actor的消息[org.apache.flink.runtime.messages.jobmanagermessages$requestrunningjobsstatus$][akka://flink/temp/$a]给演员[akka://flink/user/jobmanager_1]未交付[1] 遇到死信。可以使用配置设置“akka.log死信”和“akka.log关机时死信”关闭或调整此日志记录
这意味着要么作业管理器actor ref错误,要么发送给它的消息不正确。
代码如下所示:

val system = ActorSystem("flink", ConfigFactory.load.getConfig("akka")) //I debugged to get this path
 val jobManager = system.actorSelection("/user/jobmanager_1") //also got this akka path by debugging and getting the jobmanager akka url
val responseRunningJobs = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus, new FiniteDuration(10000, TimeUnit.MILLISECONDS))
    try {
      val result = Await.result(responseRunningJobs, new FiniteDuration(5000, TimeUnit.MILLISECONDS))
      if(result.isInstanceOf[RunningJobsStatus]){
        val runningJobs = result.asInstanceOf[RunningJobsStatus].getStatusMessages()
        val itr = runningJobs.iterator()
        while(itr.hasNext){
          val jobId = itr.next().getJobId
          val killResponse = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(new FiniteDuration(2000, TimeUnit.MILLISECONDS)));
          try {
            Await.result(killResponse, new FiniteDuration(2000, TimeUnit.MILLISECONDS))
          }
          catch {
            case e : Exception =>"Canceling the job with ID " + jobId + " failed." + e
          }

        }
      }
    }
    catch{
      case e : Exception => "Could not retrieve running jobs from the JobManager." + e
    }

  }

有人能检查一下这是不是正确的方法吗?
编辑:要完全停止作业,必须先停止taskmanager,然后停止jobmanager。

yftpprvb

yftpprvb1#

你在创造一个新的世界 ActorSystem 然后试着找一个有这个名字的演员 /user/jobmanager_1 在同一个演员系统中。这是行不通的,因为实际的作业管理器将以不同的方式运行 ActorSystem .
如果你想获得 ActorRef 对真正的工作经理来说,你要么用同样的方法 ActorSystem 对于选择(然后您可以使用本地地址),或者您已经找到作业管理器参与者的远程地址。远程地址的格式为 akka.tcp://flink@[address_of_actor_system]/user/jobmanager_[instance_number] . 如果你有权访问 FlinkMiniCluster 然后你可以用 leaderGateway 答应获得现任领导的 ActorGateway .

相关问题