转载:添Zookeeper connection loss leads to Flink job restart
看这个之前可以先看看:【Flink】Flink 报错 ResourceManager leader changed to new address null
Flink可以使用zookeeper来进行ha,而一般我们都会使用zookeeper的高级api架构curator来对zk进行通讯。在curator中引入了状态的概念,包括connected,reconnected,suspeneded,lost与read_only
,其中suspended是个有意思的状态,当因为网络抖动、机器繁忙、zk集群短暂无响应,都会导致curator将状态置为suspended.
而Flink对suspended采取了非常谨慎的处理,就是发现是suspended,则取消所有作业,进行restart,显得未免有些太敏感了
,其实这个时候往往zk也是ok的,相应的jm也是leader都没有问题。
好,我们再顺一下:
在发生zk connection loss的情况下,curator会设置suspended状态,在此状态下,curator会释放leader,flink在发现notleader之后则会revokeLeadership
,进而导致dispatcher会cancel掉所有的job,cancel的过程中flink会主动抛出异常。
虽然这样做没什么大的影响,因为其实如果connection很快恢复,作业也会很快被拉起,没有大碍,但看起来总是不好,zk连接随便的一个扰动,都可能导致job重启,所以就想把它改动。
在flink的ZooKeeperUtils.java通过CuratorFrameworkFactory来构造CuratorFramework时,通过connectionStateErrorPolicy将ConnectionStateErrorPolicy从StandardConnectionStateErrorPolicy更新为SessionConnectionStateErrorPolicy,前者将suspended和lost都作为error,后者只是将lost作为error,而只有发生error的时候才会取消leadership,所以如此设置之后,在进入suspended状态时,不在发生leadership的取消和重新选举。
代码如下
/** * todo: * 1. zkQuorum对应配置中的high-availability.zookeeper.quorum,即Zookeeper的地址 * 2. sessionTimeout对应配置中的high-availability.zookeeper.client.session-timeout, * 单位为毫秒,默认60000即一分钟,ZK会话的超时时间 * 3. connectionTimeout对应配置中的high-availability.zookeeper.client.connection-timeout, * 单位为毫秒,默认15000即15秒,ZK的连接超时时间 * 4. 重试策略为ExponentialBackoffRetry,从概率上来讲随着重试次数越来越多,重试间隔呈指数级 * 增长 * 4.1 retryWait对应配置中的high-availability.zookeeper.client.retry-wait, * 即基础的间隔时间 * 4.2 maxRetryAttempts对应配置中的high-availability.zookeeper.client.max-retry-attempts, * 即最大重试次数 * 5. rootWithNamespace由root和namespace(clusterId)拼成,root对应配置中的 * high-availability.zookeeper.path.root,默认为/flink, namespace对应配置中的 * high-availability.cluster-id, 在Yarn模式下也就是applicationId * 6. aclProvider默认使用DefaultACLProvider,相关的配置有zookeeper.sasl.disable * (默人false)和high-availability.zookeeper.client.acl(默认open) * * CheckpointreCoveryFactory * */
CuratorFramework cf =
CuratorFrameworkFactory.builder()
.connectString(zkQuorum)
.sessionTimeoutMs(sessionTimeout)
.connectionTimeoutMs(connectionTimeout)
.retryPolicy(new ExponentialBackoffRetry(retryWait, maxRetryAttempts))
// Curator prepends a '/' manually and throws an Exception if the
// namespace starts with a '/'.
.namespace(
rootWithNamespace.startsWith("/")
? rootWithNamespace.substring(1)
: rootWithNamespace)
.aclProvider(aclProvider)
// todo: 解决方法参考:https://www.cnblogs.com/029zz010buct/p/10946244.html
// 这行代码是自己加入的
.connectionStateErrorPolicy(new SessionConnectionStateErrorPolicy())
.build();
cf.start();
优点:从整体的状态转换上进行了控制,优雅。
缺点:目前flink所引用的curator的版本为2.12.0,不支持设置policy,需要更新curator版本号,是否会带来其他问题,不可知。
测试:成功。
更改curator的版本为4.2.0,提交作业,restart zk,job没有重启,checkpoint正常进行。
在flink内部,在代码ZooKeeperLeaderElectionService.java中的notLeader方法中,在收到notleader的通知的时候,根据当前的状态是否是suspended进行相应的处理。
优点:不对flink的整体造成影响,更改在局部范围内可控。
缺点:由于curator对suspended的处理依旧,所以从curator的层面还是会发生取消leadership然后重新进行选举的情况,虽然这一切都不必要。
测试:失败
总而言之,如果不动curator的逻辑,只是在flink里改,这里的逻辑就会被改的难以理解,并且还无法成功。
目前的方案应对的场景是zk connection的短时间抖动,如果发生zk connection的长时间不可用,则tm和jm都会失败,这个也是应有之义。
另,
在flink中对curator的suspended状态起作用的还有一个地方,在ZooKeeperCheckpointIDCounter.java中有对suspended的判断,如果之前是suspended或者Lost,则flink就不会去zk上存取checkpoint的信息了。这里感觉是个坑,也需要改对suspended的策略。
外一篇,
zookeeper可以设置session timeout时间,但是不是你随便设置就会起作用,会有一个判断的过程。
SessionTimeOut的协商如下:
最终SessionTimeOut,必须在minSessionTimeOut和maxSessionTimeOut区间里,如果跨越上下界,则以跨越的上届或下界为准。
maxSessionTimeout没配置则
maxSessionTimeOut设置为 20 * tickTime
minSessionTimeOut没配置则 minSessionTimeOut设置为 2 * tickTime
也就是默认情况下, SessionTimeOut的合法范围为 4秒~40秒,默认配置中tickTime为2秒。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/qq_21383435/article/details/122310599
内容来源于网络,如有侵权,请联系作者删除!