flink ha群集作业管理器问题

eit6fx6z  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(406)

我有一个Flink1.2集群的设置,由3个JobManager和2个TaskManager组成。我从jobmanager1启动zookeeper仲裁,确认zookeeper在其他两个JobManager上启动,然后在此jobmanager1上启动一个flink作业。
flink-conf.yaml在所有5个虚拟机上都是相同的,这意味着jobmanager.rpc.address:指向jobmanager1。
如果我关闭运行jobmanager1的vm,我希望zookeeper说剩下的jobmanagers中的一个是领导者,taskmanagers应该重新连接到它。相反,我在任务经理的日志中看到了很多这样的信息

2017-03-14 14:13:21,827 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Trying to register at JobManager akka.tcp://flink@1.2.3.4:43660/user/jobmanager (attempt 11, timeout: 30 seconds)
2017-03-14 14:13:21,836 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@1.2.3.4:43660] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:43660]] Caused by: [Connection refused: /1.2.3.4:43660]

为了保密,我将原来的ip修改为1.2.3.4,因为它总是相同的ip(属于jobmanager1)。
更多日志:

2017-03-15 10:28:28,655 INFO  org.apache.flink.core.fs.FileSystem                           - Ensuring all FileSystem streams are closed for Async calls on Source: Custom Source -> Flat Map (1/1)
2017-03-15 10:28:38,534 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 
2017-03-15 10:28:46,606 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused by: [Connection refused: /1.2.3.4:44779]
2017-03-15 10:28:52,431 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused by: [Connection refused: /1.2.3.4:44779]
2017-03-15 10:29:02,435 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused by: [Connection refused: /1.2.3.4:44779]
2017-03-15 10:29:10,489 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - TaskManager akka://flink/user/taskmanager disconnects from JobManager akka.tcp://flink@1.2.3.4:44779/user/jobmanager: Old JobManager lost its leadership.
2017-03-15 10:29:10,490 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Cancelling all computations and discarding all cached data.
2017-03-15 10:29:10,491 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally Source: Custom Source -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223).
2017-03-15 10:29:10,491 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223) switched from RUNNING to FAILED.
java.lang.Exception: TaskManager akka://flink/user/taskmanager disconnects from JobManager akka.tcp://flink@1.2.3.4:44779/user/jobmanager: Old JobManager lost its leadership.
    at org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1074)
    at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerLeaderAddress(TaskManager.scala:1426)
    at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:286)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
    at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:122)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2017-03-15 10:29:10,512 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Source: Custom Source -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223).
2017-03-15 10:29:10,515 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally Flat Map (1/1) (dd555e0437867c3180a1ecaf0a9f4d04).
2017-03-15 10:29:10,515 INFO  org.apache.flink.runtime.taskmanager.Task                     - Flat Map (1/1) (dd555e0437867c3180a1ecaf0a9f4d04) switched from RUNNING to FAILED.
java.lang.Exception: TaskManager akka://flink/user/taskmanager disconnects from JobManager akka.tcp://flink@1.2.3.4:44779/user/jobmanager: Old JobManager lost its leadership.
    at org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1074)
    at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerLeaderAddress(TaskManager.scala:1426)
    at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:286)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
    at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:122)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2017-03-15 10:29:10,516 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Flat Map (1/1) (dd555e0437867c3180a1ecaf0a9f4d04).
2017-03-15 10:29:10,516 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Disassociating from JobManager
2017-03-15 10:29:10,525 INFO  org.apache.flink.runtime.blob.BlobCache                       - Shutting down BlobCache
2017-03-15 10:29:10,542 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused by: [Connection refused: /1.2.3.4:44779]
2017-03-15 10:29:10,546 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Source: Custom Source -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223).
2017-03-15 10:29:10,548 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Flat Map (1/1) (dd555e0437867c3180a1ecaf0a9f4d04).
2017-03-15 10:29:10,551 INFO  org.apache.flink.core.fs.FileSystem                           - Ensuring all FileSystem streams are closed for Flat Map (1/1)
2017-03-15 10:29:10,552 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Trying to register at JobManager akka.tcp://flink@1.2.3.5:43893/user/jobmanager (attempt 1, timeout: 500 milliseconds)
2017-03-15 10:29:10,567 INFO  org.apache.flink.core.fs.FileSystem                           - Ensuring all FileSystem streams are closed for Source: Custom Source -> Flat Map (1/1)
2017-03-15 10:29:10,632 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Successful registration at JobManager (akka.tcp://flink@1.2.3.5:43893/user/jobmanager), starting network stack and library cache.
2017-03-15 10:29:10,633 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Determined BLOB server address to be /1.2.3.5:42830. Starting BLOB cache.
2017-03-15 10:29:10,633 INFO  org.apache.flink.runtime.blob.BlobCache                       - Created BLOB cache storage directory /tmp/blobStore-d97e08db-d2f1-4f00-a7d1-30c2f5823934
2017-03-15 10:29:15,551 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused by: [Connection refused: /1.2.3.4:44779]
2017-03-15 10:29:20,571 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused by: [Connection refused: /1.2.3.4:44779]
2017-03-15 10:29:25,582 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused by: [Connection refused: /1.2.3.4:44779]
2017-03-15 10:29:30,592 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@1.2.3.4:44779] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@1.2.3.4:44779]] Caused by: [Connection refused: /1.2.3.4:44779]

有人知道为什么taskmanagers没有尝试重新连接到剩余的jobmanagers之一(如上面的1.2.3.5)吗?
谢谢!

zvms9eto

zvms9eto1#

对于每个面临相同问题的人,ha要求您提供一个可以从所有节点访问的dfs位置。我让后端状态检查点目录和zookeeper存储目录在每个vm上指向一个本地文件系统位置,当其中一个jobmanager宕机时,新的负责人无法恢复正在运行的作业,因为缺少信息/位置不可访问。
编辑:因为有人问这个问题,所以我修改了文件(在ApacheFlink1.2的情况下)(https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/config.html))曾经

conf/flink-conf.yaml

我准备好了

state.backend.fs.checkpointdir
high-availability.zookeeper.storageDir

到aws s3路径。可从TaskManager和JobManager访问。

相关问题