面试官:能给我画个Zookeeper选举的图吗?

x33g5p2x  于2021-12-27 转载在 Zookeeper  
字(10.5k)|赞(0)|评价(0)|浏览(542)

一、前言

Zookeeper是一个分布式协调框架,提供分布式锁、配置项管理、服务注册与集群管理等功能。

为了保证Zookeeper的高可用,一般都会以集群的模式部署。

这个时候需要考虑各个节点的数据一致性,那么集群在启动时,需要先选举出一位Leader,再由Leader完成向其他节点的数据同步工作。

本文将是Zookeeper系列的第一篇文章,从源码角度讲述Zookeeper的选举算法。

二、准备工作

博主是在windows安装了docker desktop,使用docker-compose启动zk集群的。docker-compose.yml内容如下:

version: '2.2'
services:
  zoo1:
    image: zookeeper:3.4.14
    restart: always
    hostname: zoo1
    ports:
      - 2181:2181
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888

  zoo2:
    image: zookeeper:3.4.14
    restart: always
    hostname: zoo2
    ports:
      - 2182:2181
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888

  zoo3:
    image: zookeeper:3.4.14
    restart: always
    hostname: zoo3
    ports:
      - 2183:2181
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888

其中2181是用于客户端连接的端口,这里分别映射到了主机的三个端口上

ZOO_MY_ID代表节点id,需要手动指定

ZOO_SERVERS代表集群内的节点,格式为server.{节点id}={ip}:{数据同步端口}:{集群选举端口}

到该文件所处的目录下,执行 docker-compose up -d

这样我们的zk集群就启动好了

PS:如果下载镜像太慢,可以到Docker Engine的tab页中新增一些镜像源:

内容也贴一下:

"registry-mirrors": [
    "https://registry.docker-cn.com",
    "http://hub-mirror.c.163.com",
    "https://docker.mirrors.ustc.edu.cn"
  ]

三、基本概念

节点的角色

  • Leader,领导者,又称主节点。负责处理客户端的写请求,并将数据同步到各个子节点
  • Follower,跟随者,又称子节点。用于处理客户端的读请求,拥有投票权。
  • Observer,观察者。也可以用于处理客户端的读请求,但没有投票权,也不会参与选举与晋升。

如何查看节点的角色

使用 docker exec -it zk_zoo3_1 /bin/bash  进入该容器中

接着执行  ./bin/zkServer.sh status  查看当前节点的状态

可以看到,zoo3为leader角色。

可以推测出,zoo3容器肯定是第2个启动完成的。那这个推测是怎么来的?稍后进入源码中一探究竟。

节点的状态

每个节点,都会有一个状态,状态被定义在QuorumPeer#ServerState枚举类中

public enum ServerState {
        LOOKING,
        FOLLOWING,
        LEADING,
        OBSERVING
    }

如果一个节点处于LOOKING的状态,会去检查集群中存不存在Leader。如果不存在,则进行选举,此时ZK集群无法对外提供服务。

另外的三种状态,就和节点角色相对应。

myid

前文已经说过,是节点id,手动指定,需要全局唯一。

zxid

全称为Zookeeper Transaction Id,即zk事务id。写请求到达Leader时,Leader会为该请求分配一个全局递增的事务id。

使用  docker exec 容器名 /bin/bash  进入该容器,再使用  echo stat | nc localhost 2181  查看节点的状态,

其中两个Follower的状态为:

Leader的状态为:

可以看到zxid字段

zxid是一个64位的标识,前32位表示epoch(年代,纪元的意思),后32位主键递增计数。

每一个Leader就像皇帝一样,有自己的年号,这一点和Raft协议中的term任期一致(PS:对Raft协议感兴趣的同学,可以参考我的另外一篇博客 22张图,带你入门分布式一致性算法Raft)

如果当前Leader宕机后,下一任Leader的zxid中的epoch就会+1,然后低32位变为0。

查看当前epoch,可以使用  cat /data/version-2/currentEpoch

四、源码分析

QuorumPeerMain

是zk的启动类,main方法如下:

public static void main(String[] args) {
        QuorumPeerMain main = new QuorumPeerMain();
        //初始化
        main.initializeAndRun(args);
    }

    protected void initializeAndRun(String[] args) throws ConfigException, IOException {
        QuorumPeerConfig config = new QuorumPeerConfig();
        if (args.length == 1) {
            //args[0]为/conf/zoo.cfg
            config.parse(args[0]);
        }

        //以集群模式启动,毕竟当前servers的长度为3
        if (args.length == 1 && config.servers.size() > 0) {
            runFromConfig(config);
        } else {
            //以单机模式启动
            ZooKeeperServerMain.main(args);
        }
    }

initializeAndRun主要是根据读取到的配置,决定是以集群还单机模式启动。

runFromConfig

public void runFromConfig(QuorumPeerConfig config) throws IOException {
        //QuorumPeer本身是一个Thread对象
        quorumPeer = getQuorumPeer();

        //设置选举方式、myid等一系列参数,没有就使用默认值
        quorumPeer.setMyid(config.getServerId());
        //...

        quorumPeer.initialize();

        quorumPeer.start();
        //等待quorumPeer执行完成
        quorumPeer.join();
    }

这里启动了quorumPeer线程,quorumPeer可以理解为集群中的节点,其重写的start方法会完成当前节点的初始化工作,并且主线程需要等待quorumPeer执行完成。

直接进入run方法中

public synchronized void start() {
        //从磁盘加载数据到内存数据库中,例如获取zxid、epoch
        loadDataBase();
        //准备接受客户端请求
        cnxnFactory.start();
        //准备进行Leader选举的环境
        startLeaderElection();
        //这里将调用本类的run方法
        super.start();
    }

startLeaderElection

其实只是准备了进行选举的环境,选用FastLeaderElection作为Leader选举的策略。

该策略会创建一个用于维护集群各个节点之间通信的QuorumCnxManager对象,节点对外的投票,首先会放入FastLeaderElection.sendqueue中,之后由QuorumCnxManager发送到另外一个节点。如果收到其他节点的投票信息,则由QuorumCnxManager先存入FastLeaderElection.recvqueue中,再由当前节点消费。

这个时候,节点之间还没有进行相互投票。所以说,startLeaderElection只是初始化了投票环境。

QuorumPeer.run

super.start将会调用本类的run方法

while (running) {
            switch (getPeerState()) {
                case LOOKING:
                    //刚启动的节点,默认处于Looking状态
                    try {
                        //寻找leader,下面会细讲
                        setCurrentVote(makeLEStrategy().lookForLeader());
                    } catch (Exception e) {
                        setPeerState(ServerState.LOOKING);
                    }
                    break;
                case OBSERVING:
                    setObserver(makeObserver(logFactory));
                    observer.observeLeader();
                    break;
                case FOLLOWING:
                    setFollower(makeFollower(logFactory));
                    follower.followLeader();
                    break;
                case LEADING:
                    setLeader(makeLeader(logFactory));
                    leader.lead();
                    break;
            }
        }

run方法中是一个while循环,处于Looking状态,才会进行Leader选举。

lookForLeader

startLeaderElection选用了FastLeaderElection作为Leader选举的策略,因此这里进入FastLeaderElection的lookForLeader方法

lookForLeader方法比较复杂,分阶段去理解它。

第一阶段:节点先投票给自己

//创建一个投票箱(key为myid,value为投票信息),用于汇总当前集群内的投票信息
        HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
        //保存在集群确定leader之后还收到的投票信息
        //即保存所有处于FOLLOWING与LEADING状态的节点发出的投票信息
        HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();

        //等待其他节点投票的超时时间,默认为200毫秒
        int notTimeout = finalizeWait;

        synchronized (this) {
            //递增逻辑时钟,逻辑时钟可以理解为选举届数
            logicalclock.incrementAndGet();
            //在每次选举中,节点都会先投自己一票
            //当前方式只是更新提议,还未通知到其他节点
            //getInitId():myid    getInitLastLoggedZxid():日志中最大的zxid    getPeerEpoch():节点的epoch
            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
        }
        //将当前提议广播出去
        sendNotifications();

第二阶段:不断获取其他节点的投票信息,直至找到Leader

分为两部分:

  • 获取不到投票信息,选择重发或者重连
  • 获取到投票信息,处理投票信息
//如果当前节点处于LOOKING状态,则一直获取其他节点的投票信息,直到找到leader
        while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
            //从recieve队列中取出一个投票信息
            //上文我们说过,其他节点的投票信息,会先由QuorumCnxManager暂存到recvqueue中
            Notification n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS);

            //获取不到投票信息
            if (n == null) {
            //选择重发或者重连

            //获取到投票信息
            } else if (validVoter(n.sid) && validVoter(n.leader)) {
                //判断进行投票的节点状态
                switch (n.state) {
                    case LOOKING:
                        //......
                        break;
                    case OBSERVING:
                        //Observer是没有投票权的,因此这里不做处理
                        break;
                    case FOLLOWING:
                    case LEADING:
                        //......
                        break;
                    default:
                        break;
                }
            }
        }

获取不到投票信息

//获取不到投票信息
            if (n == null) {
                //从else逻辑就可以猜出,haveDelivered方法用于判断当前节点是否和集群中的其他节点全部失联
                if (manager.haveDelivered()) {
                    //获取不到投票信息,那就再次广播一次,其他节点也许会进行回应
                    //之前的回应可能由于网络原因丢失了,因此这里重试一下
                    sendNotifications();
                } else {
                    //与集群中的所有节点建立连接
                    manager.connectAll();
                }
                //由于获取不到投票信息,这里将超时时间扩大为两倍
                int tmpTimeOut = notTimeout * 2;
                //最长不可以超过60秒
                notTimeout = (Math.min(tmpTimeOut, maxNotificationInterval));
            }

如果能获取到投票信息,且发送投票的节点状态为LOOKING时

case LOOKING:
                        //如果推荐leader的节点的epoch大于当前逻辑时钟
                        if (n.electionEpoch > logicalclock.get()) {
                            //代表当前节点可能错过了几届选举,导致自己的逻辑时钟比其他节点小
                            //那就沿用别人的逻辑时钟
                            logicalclock.set(n.electionEpoch);
                            //清空投票箱
                            recvset.clear();
                            //判断被推荐的leader与当前节点谁更适合当leader
                            //判断的根据,是选举算法的核心,稍后会细讲
                            if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                //被推荐的leader更适合,因此更新自己的提议
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                            } else {
                                //看来还是自己更适合,推荐自己
                                updateProposal(getInitId(),
                                        getInitLastLoggedZxid(),
                                        getPeerEpoch());
                            }
                            //广播提议信息
                            sendNotifications();
                        } else if (n.electionEpoch < logicalclock.get()) {
                            //如果投票中的epoch小于当前节点的逻辑时钟,说明该票是无效的
                            //退出switch,取出下一条投票消息
                            break;
                            //如果处于同一轮选举中,且投票中的推荐的leader更适合做leader
                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                proposedLeader, proposedZxid, proposedEpoch)) {
                            //更新自己的提议,并广播出去
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                            sendNotifications();
                        }

                        //将发送投票消息的节点id及它的投票信息存入recvset中
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                        //投票箱中推荐的leader,如果和自己推荐的leader一致,且超过节点总数的一半
                        if (termPredicate(recvset,
                                new Vote(proposedLeader, proposedZxid,
                                        logicalclock.get(), proposedEpoch))) {

                            //不断取出投票信息,看leader会不会进行变动
                            while ((n = recvqueue.poll(finalizeWait,
                                    TimeUnit.MILLISECONDS)) != null) {
                                //如果投票中推荐的leader更适合做leader
                                if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                        proposedLeader, proposedZxid, proposedEpoch)) {
                                    //把该选票重新放回,说明该轮选举还没有结束
                                    recvqueue.put(n);
                                    break;
                                }
                            }
                            //如果在限定时间内,没有取出任何投票信息,说明选举即将结束
                            if (n == null) {
                                //如果leader是自己,则设置当前状态为LEADING
                                //如果不是,属于PARTICIPANT就设置FOLLOWING,否则设置OBSERVING
                                self.setPeerState((proposedLeader == self.getId()) ?
                                        ServerState.LEADING : learningState());
                                //选举收尾动作
                                Vote endVote = new Vote(proposedLeader,
                                        proposedZxid,
                                        logicalclock.get(),
                                        proposedEpoch);
                                //清空recvqueue
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }
                        break;

totalOrderPredicate

在totalOrderPredicate方法中,决定了谁更适合做leader,也是zk选举算法的核心

protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
        //判断外部节点推荐的leader的权重,
        if (self.getQuorumVerifier().getWeight(newId) == 0) {
            return false;
        }

        return ((newEpoch > curEpoch) ||
                ((newEpoch == curEpoch) &&
                        ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
    }

判断newId代表的节点(即投票信息中推荐的节点,以下先称为新节点)与当前节点更适合做leader,判断的规则如下:

  • 先比较届数,新节点的选举届数大于当前节点,则新节点更适合
  • 再比较数据新旧程度,新节点的数据新于当前节点,则新节点更适合
  • 最后比较机器id,新节点的myid大于当前节点时,则新节点

判断当前选举是否可以结束时,需要先判断推荐的leader是否大于节点总数的一半:

protected boolean termPredicate(
            HashMap<Long, Vote> votes,
            Vote vote) {

        HashSet<Long> set = new HashSet<Long>();

        //搜集投票箱中和自己推荐一致的选票
        for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
            if (vote.equals(entry.getValue())) {
                set.add(entry.getKey());
            }
        }

        return self.getQuorumVerifier().containsQuorum(set);
    }   

    //是否大于节点总数的一半
    public boolean containsQuorum(Set<Long> set){
        return (set.size() > half);
    }

如果能获取到投票信息,且发送投票的节点状态为FOLLOWING或LEADING时

case FOLLOWING:
                    case LEADING:
                        //如果逻辑时钟一致
                        if (n.electionEpoch == logicalclock.get()) {
                            //存入投票箱中
                            recvset.put(n.sid, new Vote(n.leader,
                                    n.zxid,
                                    n.electionEpoch,
                                    n.peerEpoch));
                            //如果外部推荐的leader支持率过半且合法
                            if (ooePredicate(recvset, outofelection, n)) {
                                //直接退出选举,确定自己的状态
                                self.setPeerState((n.leader == self.getId()) ?
                                        ServerState.LEADING : learningState());

                                Vote endVote = new Vote(n.leader,
                                        n.zxid,
                                        n.electionEpoch,
                                        n.peerEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }

                        //在加入一个Leader确定的集群中,先确认一下是否是大多数节点都追随同一个leader

                        //在确定leader之后收到的投票信息,全部存入outofelection中
                        //即保存所有处于FOLLOWING与LEADING状态的节点发出的投票信息
                        outofelection.put(n.sid, new Vote(n.version,
                                n.leader,
                                n.zxid,
                                n.electionEpoch,
                                n.peerEpoch,
                                n.state));
                        //如果外部节点推荐的leader在outofelection支持率过半且合法
                        //一般是在选举完成后,新加入一个节点,才会走该逻辑
                        if (ooePredicate(outofelection, outofelection, n)) {
                            synchronized (this) {
                                //同步当前节点的选举届数与状态
                                logicalclock.set(n.electionEpoch);
                                self.setPeerState((n.leader == self.getId()) ?
                                        ServerState.LEADING : learningState());
                            }
                            Vote endVote = new Vote(n.leader,
                                    n.zxid,
                                    n.electionEpoch,
                                    n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                        break;

有两种情况会走到FOLLOWING与LEADING的case中:

  • 集群已经选举出Leader,但其他节点都未及时通知到当前节点,此时n的逻辑时钟与当前一致。
  • 集群已经选举出Leader,但后来又加入了一台机器,此时逻辑时钟大概率不一致。

以上就是处于LOOKING状态的选举流程,当选举结束后,节点的状态就会确定下来,QuorumPeer类中un方法的while循环就会按照状态进入下一个阶段。

Follower执行followLeader,Leader执行lead,Observer则执行observeLeader。

因此,如果一个节点处于选举中时,则无法对外提供服务。

五、总结

下面以3个节点构成的集群为例,简要说明一下选举过程。

3个节点名称分别为zk1、zk2与zk3,数字对应于他们的myid。

启动时期选举

按序启动这个5个节点,假设它们处于同一轮选举中,即epoch一致。

  1. 先启动zk1,先投自己1票,此时zk1获得1票,但未超过半数,无法当选Leader,状态还是处于LOOKING。
  2. 接着启动zk2后,zk2也先投自己1票。zk2广播投票结果后,zk1会发现自己的epoch、zxid都与zk2相同,但myid小于zk2,因此zk1改投zk2。此时zk1获得0票,zk2获得2票,还是没有超过半数节点,zk1与zk2依然处于LOOKING。
  3. 稍后启动zk3后,zk3也先投自己1票。zk3广播投票结果后,zk1与zk2将会改投zk3。此时zk1获得0票,zk2获得0票,zk3获得3票,超过半数节点,当选为Leader,之后将状态改为LEADING,zk1与zk2则将状态改为FOLLOWING。
  4. 然后启动zk4,zk4也是先投自己1票。通过广播后,收到其他节点的投票信息,发现事情已成定局,自己来晚了,于是直接服从多数,直接将状态改为FOLLOWING。
  5. 最后启动zk5,和zk4一样的结果,状态改为FOLLOWING。

运行时期选举

运行时间选举,指的是在启动选举完成后,当选Leader的节点宕机了,此时需要重新进行选举,在选举完成前,集群无法对外提供服务。

假设Leader3宕机,其余节点通过心跳机制感应到,将会触发新一轮选举。

下面使用(myid,zxid)的形式来表达各个节点的状态,这里假设它们的epoch是一致的,但由于同步的快慢,导致自身的zxid各不相同。

  • zk1(1,5)
  • zk2(2,6)
  • zk3(3,10)
  • zk4(4,8)
  • zk5(5,7)

这是简化后的选举图,一图胜千言:

因此选举算法的核心口诀就是:

先比epoch,不行就再比zxid,还是不行那就比myid,且满足半数以上则当选为Leader。

相关文章