Zookeeper是一个分布式协调框架,提供分布式锁、配置项管理、服务注册与集群管理等功能。
为了保证Zookeeper的高可用,一般都会以集群的模式部署。
这个时候需要考虑各个节点的数据一致性,那么集群在启动时,需要先选举出一位Leader,再由Leader完成向其他节点的数据同步工作。
本文将是Zookeeper系列的第一篇文章,从源码角度讲述Zookeeper的选举算法。
博主是在windows安装了docker desktop,使用docker-compose启动zk集群的。docker-compose.yml内容如下:
version: '2.2'
services:
zoo1:
image: zookeeper:3.4.14
restart: always
hostname: zoo1
ports:
- 2181:2181
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
zoo2:
image: zookeeper:3.4.14
restart: always
hostname: zoo2
ports:
- 2182:2181
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
zoo3:
image: zookeeper:3.4.14
restart: always
hostname: zoo3
ports:
- 2183:2181
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
其中2181是用于客户端连接的端口,这里分别映射到了主机的三个端口上
ZOO_MY_ID代表节点id,需要手动指定
ZOO_SERVERS代表集群内的节点,格式为server.{节点id}={ip}:{数据同步端口}:{集群选举端口}
到该文件所处的目录下,执行 docker-compose up -d
这样我们的zk集群就启动好了
PS:如果下载镜像太慢,可以到Docker Engine的tab页中新增一些镜像源:
内容也贴一下:
"registry-mirrors": [
"https://registry.docker-cn.com",
"http://hub-mirror.c.163.com",
"https://docker.mirrors.ustc.edu.cn"
]
使用 docker exec -it zk_zoo3_1 /bin/bash 进入该容器中
接着执行 ./bin/zkServer.sh status 查看当前节点的状态
可以看到,zoo3为leader角色。
可以推测出,zoo3容器肯定是第2个启动完成的。那这个推测是怎么来的?稍后进入源码中一探究竟。
每个节点,都会有一个状态,状态被定义在QuorumPeer#ServerState枚举类中
public enum ServerState {
LOOKING,
FOLLOWING,
LEADING,
OBSERVING
}
如果一个节点处于LOOKING的状态,会去检查集群中存不存在Leader。如果不存在,则进行选举,此时ZK集群无法对外提供服务。
另外的三种状态,就和节点角色相对应。
前文已经说过,是节点id,手动指定,需要全局唯一。
全称为Zookeeper Transaction Id,即zk事务id。写请求到达Leader时,Leader会为该请求分配一个全局递增的事务id。
使用 docker exec 容器名 /bin/bash 进入该容器,再使用 echo stat | nc localhost 2181 查看节点的状态,
其中两个Follower的状态为:
Leader的状态为:
可以看到zxid字段
zxid是一个64位的标识,前32位表示epoch(年代,纪元的意思),后32位主键递增计数。
每一个Leader就像皇帝一样,有自己的年号,这一点和Raft协议中的term任期一致(PS:对Raft协议感兴趣的同学,可以参考我的另外一篇博客 22张图,带你入门分布式一致性算法Raft)
如果当前Leader宕机后,下一任Leader的zxid中的epoch就会+1,然后低32位变为0。
查看当前epoch,可以使用 cat /data/version-2/currentEpoch
是zk的启动类,main方法如下:
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
//初始化
main.initializeAndRun(args);
}
protected void initializeAndRun(String[] args) throws ConfigException, IOException {
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
//args[0]为/conf/zoo.cfg
config.parse(args[0]);
}
//以集群模式启动,毕竟当前servers的长度为3
if (args.length == 1 && config.servers.size() > 0) {
runFromConfig(config);
} else {
//以单机模式启动
ZooKeeperServerMain.main(args);
}
}
initializeAndRun主要是根据读取到的配置,决定是以集群还单机模式启动。
public void runFromConfig(QuorumPeerConfig config) throws IOException {
//QuorumPeer本身是一个Thread对象
quorumPeer = getQuorumPeer();
//设置选举方式、myid等一系列参数,没有就使用默认值
quorumPeer.setMyid(config.getServerId());
//...
quorumPeer.initialize();
quorumPeer.start();
//等待quorumPeer执行完成
quorumPeer.join();
}
这里启动了quorumPeer线程,quorumPeer可以理解为集群中的节点,其重写的start方法会完成当前节点的初始化工作,并且主线程需要等待quorumPeer执行完成。
直接进入run方法中
public synchronized void start() {
//从磁盘加载数据到内存数据库中,例如获取zxid、epoch
loadDataBase();
//准备接受客户端请求
cnxnFactory.start();
//准备进行Leader选举的环境
startLeaderElection();
//这里将调用本类的run方法
super.start();
}
其实只是准备了进行选举的环境,选用FastLeaderElection作为Leader选举的策略。
该策略会创建一个用于维护集群各个节点之间通信的QuorumCnxManager对象,节点对外的投票,首先会放入FastLeaderElection.sendqueue中,之后由QuorumCnxManager发送到另外一个节点。如果收到其他节点的投票信息,则由QuorumCnxManager先存入FastLeaderElection.recvqueue中,再由当前节点消费。
这个时候,节点之间还没有进行相互投票。所以说,startLeaderElection只是初始化了投票环境。
super.start将会调用本类的run方法
while (running) {
switch (getPeerState()) {
case LOOKING:
//刚启动的节点,默认处于Looking状态
try {
//寻找leader,下面会细讲
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
setPeerState(ServerState.LOOKING);
}
break;
case OBSERVING:
setObserver(makeObserver(logFactory));
observer.observeLeader();
break;
case FOLLOWING:
setFollower(makeFollower(logFactory));
follower.followLeader();
break;
case LEADING:
setLeader(makeLeader(logFactory));
leader.lead();
break;
}
}
run方法中是一个while循环,处于Looking状态,才会进行Leader选举。
startLeaderElection选用了FastLeaderElection作为Leader选举的策略,因此这里进入FastLeaderElection的lookForLeader方法
lookForLeader方法比较复杂,分阶段去理解它。
//创建一个投票箱(key为myid,value为投票信息),用于汇总当前集群内的投票信息
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
//保存在集群确定leader之后还收到的投票信息
//即保存所有处于FOLLOWING与LEADING状态的节点发出的投票信息
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
//等待其他节点投票的超时时间,默认为200毫秒
int notTimeout = finalizeWait;
synchronized (this) {
//递增逻辑时钟,逻辑时钟可以理解为选举届数
logicalclock.incrementAndGet();
//在每次选举中,节点都会先投自己一票
//当前方式只是更新提议,还未通知到其他节点
//getInitId():myid getInitLastLoggedZxid():日志中最大的zxid getPeerEpoch():节点的epoch
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
//将当前提议广播出去
sendNotifications();
分为两部分:
//如果当前节点处于LOOKING状态,则一直获取其他节点的投票信息,直到找到leader
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
//从recieve队列中取出一个投票信息
//上文我们说过,其他节点的投票信息,会先由QuorumCnxManager暂存到recvqueue中
Notification n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS);
//获取不到投票信息
if (n == null) {
//选择重发或者重连
//获取到投票信息
} else if (validVoter(n.sid) && validVoter(n.leader)) {
//判断进行投票的节点状态
switch (n.state) {
case LOOKING:
//......
break;
case OBSERVING:
//Observer是没有投票权的,因此这里不做处理
break;
case FOLLOWING:
case LEADING:
//......
break;
default:
break;
}
}
}
获取不到投票信息
//获取不到投票信息
if (n == null) {
//从else逻辑就可以猜出,haveDelivered方法用于判断当前节点是否和集群中的其他节点全部失联
if (manager.haveDelivered()) {
//获取不到投票信息,那就再次广播一次,其他节点也许会进行回应
//之前的回应可能由于网络原因丢失了,因此这里重试一下
sendNotifications();
} else {
//与集群中的所有节点建立连接
manager.connectAll();
}
//由于获取不到投票信息,这里将超时时间扩大为两倍
int tmpTimeOut = notTimeout * 2;
//最长不可以超过60秒
notTimeout = (Math.min(tmpTimeOut, maxNotificationInterval));
}
如果能获取到投票信息,且发送投票的节点状态为LOOKING时
case LOOKING:
//如果推荐leader的节点的epoch大于当前逻辑时钟
if (n.electionEpoch > logicalclock.get()) {
//代表当前节点可能错过了几届选举,导致自己的逻辑时钟比其他节点小
//那就沿用别人的逻辑时钟
logicalclock.set(n.electionEpoch);
//清空投票箱
recvset.clear();
//判断被推荐的leader与当前节点谁更适合当leader
//判断的根据,是选举算法的核心,稍后会细讲
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
//被推荐的leader更适合,因此更新自己的提议
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
//看来还是自己更适合,推荐自己
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
//广播提议信息
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
//如果投票中的epoch小于当前节点的逻辑时钟,说明该票是无效的
//退出switch,取出下一条投票消息
break;
//如果处于同一轮选举中,且投票中的推荐的leader更适合做leader
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
//更新自己的提议,并广播出去
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
//将发送投票消息的节点id及它的投票信息存入recvset中
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//投票箱中推荐的leader,如果和自己推荐的leader一致,且超过节点总数的一半
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {
//不断取出投票信息,看leader会不会进行变动
while ((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null) {
//如果投票中推荐的leader更适合做leader
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
//把该选票重新放回,说明该轮选举还没有结束
recvqueue.put(n);
break;
}
}
//如果在限定时间内,没有取出任何投票信息,说明选举即将结束
if (n == null) {
//如果leader是自己,则设置当前状态为LEADING
//如果不是,属于PARTICIPANT就设置FOLLOWING,否则设置OBSERVING
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING : learningState());
//选举收尾动作
Vote endVote = new Vote(proposedLeader,
proposedZxid,
logicalclock.get(),
proposedEpoch);
//清空recvqueue
leaveInstance(endVote);
return endVote;
}
}
break;
totalOrderPredicate
在totalOrderPredicate方法中,决定了谁更适合做leader,也是zk选举算法的核心
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
//判断外部节点推荐的leader的权重,
if (self.getQuorumVerifier().getWeight(newId) == 0) {
return false;
}
return ((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}
判断newId代表的节点(即投票信息中推荐的节点,以下先称为新节点)与当前节点更适合做leader,判断的规则如下:
判断当前选举是否可以结束时,需要先判断推荐的leader是否大于节点总数的一半:
protected boolean termPredicate(
HashMap<Long, Vote> votes,
Vote vote) {
HashSet<Long> set = new HashSet<Long>();
//搜集投票箱中和自己推荐一致的选票
for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
if (vote.equals(entry.getValue())) {
set.add(entry.getKey());
}
}
return self.getQuorumVerifier().containsQuorum(set);
}
//是否大于节点总数的一半
public boolean containsQuorum(Set<Long> set){
return (set.size() > half);
}
如果能获取到投票信息,且发送投票的节点状态为FOLLOWING或LEADING时
case FOLLOWING:
case LEADING:
//如果逻辑时钟一致
if (n.electionEpoch == logicalclock.get()) {
//存入投票箱中
recvset.put(n.sid, new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch));
//如果外部推荐的leader支持率过半且合法
if (ooePredicate(recvset, outofelection, n)) {
//直接退出选举,确定自己的状态
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING : learningState());
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
//在加入一个Leader确定的集群中,先确认一下是否是大多数节点都追随同一个leader
//在确定leader之后收到的投票信息,全部存入outofelection中
//即保存所有处于FOLLOWING与LEADING状态的节点发出的投票信息
outofelection.put(n.sid, new Vote(n.version,
n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch,
n.state));
//如果外部节点推荐的leader在outofelection支持率过半且合法
//一般是在选举完成后,新加入一个节点,才会走该逻辑
if (ooePredicate(outofelection, outofelection, n)) {
synchronized (this) {
//同步当前节点的选举届数与状态
logicalclock.set(n.electionEpoch);
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING : learningState());
}
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
有两种情况会走到FOLLOWING与LEADING的case中:
以上就是处于LOOKING状态的选举流程,当选举结束后,节点的状态就会确定下来,QuorumPeer类中un方法的while循环就会按照状态进入下一个阶段。
Follower执行followLeader,Leader执行lead,Observer则执行observeLeader。
因此,如果一个节点处于选举中时,则无法对外提供服务。
下面以3个节点构成的集群为例,简要说明一下选举过程。
3个节点名称分别为zk1、zk2与zk3,数字对应于他们的myid。
按序启动这个5个节点,假设它们处于同一轮选举中,即epoch一致。
运行时间选举,指的是在启动选举完成后,当选Leader的节点宕机了,此时需要重新进行选举,在选举完成前,集群无法对外提供服务。
假设Leader3宕机,其余节点通过心跳机制感应到,将会触发新一轮选举。
下面使用(myid,zxid)的形式来表达各个节点的状态,这里假设它们的epoch是一致的,但由于同步的快慢,导致自身的zxid各不相同。
这是简化后的选举图,一图胜千言:
因此选举算法的核心口诀就是:
先比epoch,不行就再比zxid,还是不行那就比myid,且满足半数以上则当选为Leader。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/qq_33591903/article/details/122096389
内容来源于网络,如有侵权,请联系作者删除!