Nacos源码分析二十三、Raft选举过程(1)

x33g5p2x  于2021-12-20 转载在 其他  
字(13.3k)|赞(0)|评价(0)|浏览(602)

关于raft的原理就不多介绍了,有兴趣的百度吧。这里提供一个小动画,蛮形象的,加深理解吧。

我们知道raft节点的状态只有leader、follower、candidate三种。各状态流转如下:

然后我们讲几个概念:

  1. majority 大多数
  2. term任期 – 任期是不断增加的
  3. election选举

然后有两个超时时间

  1. election timeout – 选举超时时间
  2. heatbeat timeout – 心跳超时时间

下面我们看一下nacos启动时做了什么吧。我们思考一下既然要看启动做了什么,那么怎么进行呢?

  1. 筛选出哪些类需要关注的
  2. 这些需要关注的类的构造方法
  3. 由于是SpringBoot环境,要考虑@PostConstruct注解的方法
  4. 同样由于是SpringBoot环境,对应的事件监听、处理器等也要考虑。

我们通过上面总结出来的几个点再去看。 首先需要筛选出来哪些类需要关注的,这里直接列出来吧:

ServerMemberManager、RaftPeerSet、RaftCore等。

ServerMemberManager

先看一下定义

@Component(value = "serverMemberManager")
public class ServerMemberManager implements ApplicationListener<WebServerInitializedEvent> {
	...
}

可以看到,监听了WebServerInitializedEvent事件。我们需要关注一下onApplicationEvent方法。

构造方法:
public ServerMemberManager(ServletContext servletContext) throws Exception {
    // 初始化一个空的跳表
    this.serverList = new ConcurrentSkipListMap<>();
    // 设置上下文
    ApplicationUtils.setContextPath(servletContext.getContextPath());
    // MemberUtils 设置manager,后面使用工具方法就有目标了。
    MemberUtils.setManager(this);
    // 初始化
    init();
}

然后是init方法:

protected void init() throws NacosException {
    Loggers.CORE.info("Nacos-related cluster resource initialization");
    // 本机提供服务的端口
    this.port = ApplicationUtils.getProperty("server.port", Integer.class, 8848);
    // 本机服务地址
    this.localAddress = InetUtils.getSelfIp() + ":" + port;
    // 初始化self, 就是构造一个Member对象。
    this.self = MemberUtils.singleParse(this.localAddress);
    // 设置version
    this.self.setExtendVal(MemberMetaDataConstants.VERSION, VersionUtils.version);
    // 服务列表先把自己加进去
    serverList.put(self.getAddress(), self);

    // register NodeChangeEvent publisher to NotifyManager
    // 注册事件监听 MembersChangeEvent事件
    registerClusterEvent();

    // Initializes the lookup mode
    // 选择lookup的模式,进行serverList的初始化
    initAndStartLookup();

    if (serverList.isEmpty()) {
        throw new NacosException(NacosException.SERVER_ERROR, "cannot get serverlist, so exit.");
    }

    Loggers.CORE.info("The cluster resource is initialized");
}

先看一下registerClusterEvent方法:

private void registerClusterEvent() {
    // Register node change events
    // 通知中心注册MembersChangeEvent事件
    NotifyCenter.registerToPublisher(MembersChangeEvent.class,
            ApplicationUtils.getProperty("nacos.member-change-event.queue.size", Integer.class, 128));
    
    // 添加IPChangeEvent事件订阅
    NotifyCenter.registerSubscriber(new Subscriber<InetUtils.IPChangeEvent>() {
        @Override
        public void onEvent(InetUtils.IPChangeEvent event) {
            String newAddress = event.getNewIp() + ":" + port;
            ServerMemberManager.this.localAddress = newAddress;
            ApplicationUtils.setLocalAddress(localAddress);

            Member self = ServerMemberManager.this.self;
            self.setIp(event.getNewIp());

            String oldAddress = event.getOldIp() + ":" + port;
            ServerMemberManager.this.serverList.remove(oldAddress);
            ServerMemberManager.this.serverList.put(newAddress, self);
            
            ServerMemberManager.this.memberAddressInfos.remove(oldAddress);
            ServerMemberManager.this.memberAddressInfos.add(newAddress);
        }
        
        @Override
        public Class<? extends Event> subscribeType() {
            return InetUtils.IPChangeEvent.class;
        }
    });
}

首先注册MembersChangeEvent事件

然后订阅IPChangeEvent事件。主要就是更新self、serverList和memberAddressInfos。

initAndStartLookup方法:

private void initAndStartLookup() throws NacosException {
    this.lookup = LookupFactory.createLookUp(this);
    this.lookup.start();
}

通过工厂创建一个lookup,然后start。这里的lookup的实现主要有AddressServerMemberLookup(通过地址服务)、FileConfigMemberLookup(通过文件配置)、StandaloneMemberLookup(单机模式)。我们看一下FileConfigMemberLookup的start,另外两个有兴趣自行查看:

@Override
public void start() throws NacosException {
    if (start.compareAndSet(false, true)) {
        readClusterConfFromDisk();
        
        // Use the inotify mechanism to monitor file changes and automatically
        // trigger the reading of cluster.conf
        try {
            WatchFileCenter.registerWatcher(ApplicationUtils.getConfFilePath(), watcher);
        } catch (Throwable e) {
            Loggers.CLUSTER.error("An exception occurred in the launch file monitor : {}", e.getMessage());
        }
    }
}

首先从文件中读出集群信息,然后添加一个监听器,用于监听文件变化。

private void readClusterConfFromDisk() {
    Collection<Member> tmpMembers = new ArrayList<>();
    try {
        List<String> tmp = ApplicationUtils.readClusterConf();
        tmpMembers = MemberUtils.readServerConf(tmp);
    } catch (Throwable e) {
        Loggers.CLUSTER
                .error("nacos-XXXX [serverlist] failed to get serverlist from disk!, error : {}", e.getMessage());
    }
    
    afterLookup(tmpMembers);
}

@Override
public void afterLookup(Collection<Member> members) {
    this.memberManager.memberChange(members);
}

读出来后调用afterLookup,实际上就是ServerMemberManager的memberChange方法:

synchronized boolean memberChange(Collection<Member> members) {
    
    if (members == null || members.isEmpty()) {
        return false;
    }
    
    boolean isContainSelfIp = members.stream()
            .anyMatch(ipPortTmp -> Objects.equals(localAddress, ipPortTmp.getAddress()));
    
    if (isContainSelfIp) {
        isInIpList = true;
    } else {
        isInIpList = false;
        members.add(this.self);
        Loggers.CLUSTER.warn("[serverlist] self ip {} not in serverlist {}", self, members);
    }
    
    boolean hasChange = members.size() != serverList.size();
    ConcurrentSkipListMap<String, Member> tmpMap = new ConcurrentSkipListMap<>();
    Set<String> tmpAddressInfo = new ConcurrentHashSet<>();
    for (Member member : members) {
        final String address = member.getAddress();
        
        if (!serverList.containsKey(address)) {
            hasChange = true;
        }
        
        // Ensure that the node is created only once
        tmpMap.put(address, member);
        tmpAddressInfo.add(address);
    }
    
    serverList = tmpMap;
    memberAddressInfos = tmpAddressInfo;
    
    Collection<Member> finalMembers = allMembers();
    
    Loggers.CLUSTER.warn("[serverlist] updated to : {}", finalMembers);
    
    if (hasChange) {
        MemberUtils.syncToFile(finalMembers);
        Set<Member> healthMembers = MemberUtils.selectTargetMembers(members, member -> {
            return !NodeState.DOWN.equals(member.getState());
        });
        Event event = MembersChangeEvent.builder().members(finalMembers).build();
        NotifyCenter.publishEvent(event);
    }
    
    return hasChange;
}

就是初始化serverList和memberAddressInfos,如果有变更还会发布MembersChangeEvent事件。

onApplicationEvent方法:
@Override
public void onApplicationEvent(WebServerInitializedEvent event) {
    getSelf().setState(NodeState.UP);
    if (!ApplicationUtils.getStandaloneMode()) {
        GlobalExecutor.scheduleByCommon(this.infoReportTask, 5_000L);
    }
    ApplicationUtils.setPort(event.getWebServer().getPort());
    ApplicationUtils.setLocalAddress(this.localAddress);
    Loggers.CLUSTER.info("This node is ready to provide external services");
}

设置self的状态为up。如果不是单机模式,延迟5秒启动一个任务infoReportTask。 我们看一下这个任务的定义:

@Override
protected void executeBody() {
    List<Member> members = ServerMemberManager.this.allMembersWithoutSelf();

    // 不包含自己的member集合
    if (members.isEmpty()) {
        return;
    }

    // 这里相当于一个游标,每次任务执行往下跳一个。
    this.cursor = (this.cursor + 1) % members.size();
    // 目标节点
    Member target = members.get(cursor);
    
    Loggers.CLUSTER.debug("report the metadata to the node : {}", target.getAddress());

    // /nacos/v1/core/cluster/report
    final String url = HttpUtils
            .buildUrl(false, target.getAddress(), ApplicationUtils.getContextPath(), Commons.NACOS_CORE_CONTEXT,
                    "/cluster/report");
    
    try {
        asyncRestTemplate
                .post(url, Header.newInstance().addParam(Constants.NACOS_SERVER_HEADER, VersionUtils.version),
                        Query.EMPTY, getSelf(), reference.getType(), new Callback<String>() {
                            @Override
                            public void onReceive(RestResult<String> result) {
                                if (result.getCode() == HttpStatus.NOT_IMPLEMENTED.value()
                                        || result.getCode() == HttpStatus.NOT_FOUND.value()) {
                                    Loggers.CLUSTER
                                            .warn("{} version is too low, it is recommended to upgrade the version : {}",
                                                    target, VersionUtils.version);
                                    return;
                                }
                                if (result.ok()) {
                                    MemberUtils.onSuccess(target);
                                } else {
                                    Loggers.CLUSTER
                                            .warn("failed to report new info to target node : {}, result : {}",
                                                    target.getAddress(), result);
                                    MemberUtils.onFail(target);
                                }
                            }
                            
                            @Override
                            public void onError(Throwable throwable) {
                                Loggers.CLUSTER
                                        .error("failed to report new info to target node : {}, error : {}",
                                                target.getAddress(),
                                                ExceptionUtil.getAllExceptionMsg(throwable));
                                MemberUtils.onFail(target, throwable);
                            }
    
                            @Override
                            public void onCancel() {
        
                            }
                        });
    } catch (Throwable ex) {
        Loggers.CLUSTER.error("failed to report new info to target node : {}, error : {}", target.getAddress(),
                ExceptionUtil.getAllExceptionMsg(ex));
    }
}

每次执行把自己self发送给对方。发送成功/失败会更新目标的状态。/nacos/v1/core/cluster/report

然后看一下after方法:

@Override
protected void after() {
    GlobalExecutor.scheduleByCommon(this, 2_000L);
}

就是把任务丢回去,延迟2秒执行。

我们看一下接收端,NacosClusterController的report方法:

@PostMapping(value = {"/report"})
public RestResult<String> report(@RequestBody Member node) {
    if (!node.check()) {
        return RestResultUtils.failedWithMsg(400, "Node information is illegal");
    }
    LoggerUtils.printIfDebugEnabled(Loggers.CLUSTER, "node state report, receive info : {}", node);
    node.setState(NodeState.UP);
    node.setFailAccessCnt(0);
    
    boolean result = memberManager.update(node);
    
    return RestResultUtils.success(Boolean.toString(result));
}

然后是memberManager.update:

public boolean update(Member newMember) {
    Loggers.CLUSTER.debug("member information update : {}", newMember);
    
    String address = newMember.getAddress();
    if (!serverList.containsKey(address)) {
        return false;
    }
    
    serverList.computeIfPresent(address, (s, member) -> {
        if (NodeState.DOWN.equals(newMember.getState())) {
            memberAddressInfos.remove(newMember.getAddress());
        }
        if (!MemberUtils.fullEquals(newMember, member)) {
            newMember.setExtendVal(MemberMetaDataConstants.LAST_REFRESH_TIME, System.currentTimeMillis());
            MemberUtils.copy(newMember, member);
            // member data changes and all listeners need to be notified
            NotifyCenter.publishEvent(MembersChangeEvent.builder().members(allMembers()).build());
        }
        return member;
    });
    
    return true;
}

就是更新目标节点serverList中对应节点的状态。

总结一下

ServerMemberManager初始化过程

  1. 注册MembersChangeEvent事件,添加IPChangeEvent监听
  2. 通过lookup初始化集群服务列表
  3. 创建一个延迟5秒,周期2秒的MemberInfoReportTask任务,为了集群中各节点同步节点状态的。

RaftPeerSet

RaftPeerSet是依赖ProtocolManager的创建的,而ProtocolManager的创建依赖ServerMemberManager(@DependsOn)。也就是说初始化RaftPeerSet时,集群服务列表已经加载完成了

init

先看一下它的初始化方法,@PostConstruct注解的init方法:

@PostConstruct
public void init() {
    NotifyCenter.registerSubscriber(this);
    changePeers(memberManager.allMembers());
}

注册MembersChangeEvent事件订阅者,更新peers

private void changePeers(Collection<Member> members) {
    Map<String, RaftPeer> tmpPeers = new HashMap<>(members.size());

    // 遍历各节点
    for (Member member : members) {

        final String address = member.getAddress();
        // 如果已经有了,直接添加到临时的里面
        if (peers.containsKey(address)) {
            tmpPeers.put(address, peers.get(address));
            continue;
        }

        // 创建一个新的peer
        RaftPeer raftPeer = new RaftPeer();
        raftPeer.ip = address;

        // first time meet the local server:
        if (ApplicationUtils.getLocalAddress().equals(address)) {
            raftPeer.term.set(localTerm.get());
        }

        // 也放进去
        tmpPeers.put(address, raftPeer);
    }

    // replace raft peer set:
    peers = tmpPeers;

    // ready了,可以进行投票了
    ready = true;
    Loggers.RAFT.info("raft peers changed: " + members);
}

关键是这个ready = true,当RaftPeer准备好后才能发起投票。

我们看一下RaftPeer:

public class RaftPeer {
    
    public String ip;
    
    public String voteFor;

    // leader的任期标识,每次leader变更term+1
    public AtomicLong term = new AtomicLong(0L);

    // leader任期 0-15随机
    public volatile long leaderDueMs = RandomUtils.nextLong(0, GlobalExecutor.LEADER_TIMEOUT_MS);

    // 心跳任期 0-5随机
    public volatile long heartbeatDueMs = RandomUtils.nextLong(0, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
    
    public volatile State state = State.FOLLOWER;
    
    // 重置leader任期 15+0~5 即 15<leaderDueMs<20
    public void resetLeaderDue() {
        leaderDueMs = GlobalExecutor.LEADER_TIMEOUT_MS + RandomUtils.nextLong(0, GlobalExecutor.RANDOM_MS);
    }
    
    // 重置心跳任期 5秒
    public void resetHeartbeatDue() {
        heartbeatDueMs = GlobalExecutor.HEARTBEAT_INTERVAL_MS;
    }
    
   ...
}
onEvent

它监听了MembersChangeEvent事件,我们看一下onEvent方法:

@Override
public void onEvent(MembersChangeEvent event) {
    Collection<Member> members = event.getMembers();
    if (oldMembers == null) {
        oldMembers = new HashSet<>(members);
    } else {
        oldMembers.removeAll(members);
    }
    
    if (!oldMembers.isEmpty()) {
        changePeers(members);
    }

    oldMembers.clear();
    oldMembers.addAll(members);
}

事件传过来的members从oldMembers里移除,如果此时oldMembers非空,说明老的和新的存在差异,更新peers, 最后oldMembers也更新。

总结一下

RaftPeerSet主要是peers列表的准备工作,当准备完成后设置ready=true, 此时RaftCore中的任务会根据这个状态来决定是否开始进行选举、心跳等。

RaftCore

@PostConstruct注解的init方法:

@PostConstruct
public void init() throws Exception {
    
    Loggers.RAFT.info("initializing Raft sub-system");
    
    // 启动notifier
    executor.submit(notifier);
    
    final long start = System.currentTimeMillis();
    
    // 加载数据
    raftStore.loadDatums(notifier, datums);
    
    // 设置任期 term
    setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));
    
    Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm());
    
    // 没有任务才会往下执行
    while (true) {
        if (notifier.tasks.size() <= 0) {
            break;
        }
        Thread.sleep(1000L);
    }
    
    initialized = true;
    
    Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));

    // 选举任务 500毫秒
    GlobalExecutor.registerMasterElection(new MasterElection());

    // 心跳任务 500毫秒
    GlobalExecutor.registerHeartbeat(new HeartBeat());
    
    Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",
            GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
}

这里主要关注选举任务和心跳任务。这两个任务的周期是有点区别的,MasterElection是固定500毫秒执行一次。HeartBeat是固定延迟500毫秒执行一次,即每次执行完等500毫秒。

MasterElection
@Override
public void run() {
    try {
        // ready状态
        if (!peers.isReady()) {
            return;
        }

        //获取本地结点
        RaftPeer local = peers.local();
        // 任期-500毫秒
        local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;

        //任期超时没到
        if (local.leaderDueMs > 0) {
            return;
        }

        // 重置任期超时时间和心跳时间,准备开始拉票
        // reset timeout
        local.resetLeaderDue();
        local.resetHeartbeatDue();

        // 发起投票
        sendVote();
    } catch (Exception e) {
        Loggers.RAFT.warn("[RAFT] error while master election {}", e);
    }
    
}

首先判断是否ready,然后看看自己的任期有没有到,如果到了重置任期和心跳,然后开始发起投票。 具体投票过程后面我们再分析。

这里任期-500毫秒,就是一个任务的周期。

HeartBeat
@Override
public void run() {
    try {
        
        if (!peers.isReady()) {
            return;
        }
        
        RaftPeer local = peers.local();
        // -500毫秒
        local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
        // 没有到,退出
        if (local.heartbeatDueMs > 0) {
            return;
        }

        //重置心跳
        local.resetHeartbeatDue();
        
        sendBeat();
    } catch (Exception e) {
        Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);
    }
    
}

一样也是先判断是否ready。 然后看看心跳超时是否到了,如果到了,再发个心跳。具体sendBeat过程后面再分析。

总结一下
  1. 启动notifier
  2. 加载raft数据
  3. 设置term任期
  4. 启动选举任务和心跳任务

相关文章