我的情况是我想停止/取消代码中的flink作业。这是在我的集成测试中,我向我的flink工作提交一个任务并检查结果。当作业异步运行时,即使测试失败/通过,它也不会停止。我想在考试结束后停止工作。
我试过以下几件事:
找jobmanager演员
获取正在运行的作业
对于每个正在运行的作业,向作业管理器发送一个取消请求
当然,这是因为没有运行,但我不确定jobmanager actorref是错误的还是缺少了其他内容。
我得到的错误是:[flink akka.actor.default-dispatcher-5][akka://flink/user/jobmanager_1]来自actor的消息[org.apache.flink.runtime.messages.jobmanagermessages$requestrunningjobsstatus$][akka://flink/temp/$a]给演员[akka://flink/user/jobmanager_1]未交付[1] 遇到死信。可以使用配置设置“akka.log死信”和“akka.log关机时死信”关闭或调整此日志记录
这意味着要么作业管理器actor ref错误,要么发送给它的消息不正确。
代码如下所示:
val system = ActorSystem("flink", ConfigFactory.load.getConfig("akka")) //I debugged to get this path
val jobManager = system.actorSelection("/user/jobmanager_1") //also got this akka path by debugging and getting the jobmanager akka url
val responseRunningJobs = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus, new FiniteDuration(10000, TimeUnit.MILLISECONDS))
try {
val result = Await.result(responseRunningJobs, new FiniteDuration(5000, TimeUnit.MILLISECONDS))
if(result.isInstanceOf[RunningJobsStatus]){
val runningJobs = result.asInstanceOf[RunningJobsStatus].getStatusMessages()
val itr = runningJobs.iterator()
while(itr.hasNext){
val jobId = itr.next().getJobId
val killResponse = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(new FiniteDuration(2000, TimeUnit.MILLISECONDS)));
try {
Await.result(killResponse, new FiniteDuration(2000, TimeUnit.MILLISECONDS))
}
catch {
case e : Exception =>"Canceling the job with ID " + jobId + " failed." + e
}
}
}
}
catch{
case e : Exception => "Could not retrieve running jobs from the JobManager." + e
}
}
有人能检查一下这是不是正确的方法吗?
编辑:要完全停止作业,必须先停止taskmanager,然后停止jobmanager。
1条答案
按热度按时间yftpprvb1#
你在创造一个新的世界
ActorSystem
然后试着找一个有这个名字的演员/user/jobmanager_1
在同一个演员系统中。这是行不通的,因为实际的作业管理器将以不同的方式运行ActorSystem
.如果你想获得
ActorRef
对真正的工作经理来说,你要么用同样的方法ActorSystem
对于选择(然后您可以使用本地地址),或者您已经找到作业管理器参与者的远程地址。远程地址的格式为akka.tcp://flink@[address_of_actor_system]/user/jobmanager_[instance_number]
. 如果你有权访问FlinkMiniCluster
然后你可以用leaderGateway
答应获得现任领导的ActorGateway
.