文章24 | 阅读 12816 | 点赞0
关于raft的原理就不多介绍了,有兴趣的百度吧。这里提供一个小动画,蛮形象的,加深理解吧。
我们知道raft节点的状态只有leader、follower、candidate三种。各状态流转如下:
然后我们讲几个概念:
然后有两个超时时间
下面我们看一下nacos启动时做了什么吧。我们思考一下既然要看启动做了什么,那么怎么进行呢?
我们通过上面总结出来的几个点再去看。 首先需要筛选出来哪些类需要关注的,这里直接列出来吧:
ServerMemberManager、RaftPeerSet、RaftCore等。
先看一下定义
@Component(value = "serverMemberManager")
public class ServerMemberManager implements ApplicationListener<WebServerInitializedEvent> {
...
}
可以看到,监听了WebServerInitializedEvent事件。我们需要关注一下onApplicationEvent方法。
public ServerMemberManager(ServletContext servletContext) throws Exception {
// 初始化一个空的跳表
this.serverList = new ConcurrentSkipListMap<>();
// 设置上下文
ApplicationUtils.setContextPath(servletContext.getContextPath());
// MemberUtils 设置manager,后面使用工具方法就有目标了。
MemberUtils.setManager(this);
// 初始化
init();
}
然后是init方法:
protected void init() throws NacosException {
Loggers.CORE.info("Nacos-related cluster resource initialization");
// 本机提供服务的端口
this.port = ApplicationUtils.getProperty("server.port", Integer.class, 8848);
// 本机服务地址
this.localAddress = InetUtils.getSelfIp() + ":" + port;
// 初始化self, 就是构造一个Member对象。
this.self = MemberUtils.singleParse(this.localAddress);
// 设置version
this.self.setExtendVal(MemberMetaDataConstants.VERSION, VersionUtils.version);
// 服务列表先把自己加进去
serverList.put(self.getAddress(), self);
// register NodeChangeEvent publisher to NotifyManager
// 注册事件监听 MembersChangeEvent事件
registerClusterEvent();
// Initializes the lookup mode
// 选择lookup的模式,进行serverList的初始化
initAndStartLookup();
if (serverList.isEmpty()) {
throw new NacosException(NacosException.SERVER_ERROR, "cannot get serverlist, so exit.");
}
Loggers.CORE.info("The cluster resource is initialized");
}
先看一下registerClusterEvent方法:
private void registerClusterEvent() {
// Register node change events
// 通知中心注册MembersChangeEvent事件
NotifyCenter.registerToPublisher(MembersChangeEvent.class,
ApplicationUtils.getProperty("nacos.member-change-event.queue.size", Integer.class, 128));
// 添加IPChangeEvent事件订阅
NotifyCenter.registerSubscriber(new Subscriber<InetUtils.IPChangeEvent>() {
@Override
public void onEvent(InetUtils.IPChangeEvent event) {
String newAddress = event.getNewIp() + ":" + port;
ServerMemberManager.this.localAddress = newAddress;
ApplicationUtils.setLocalAddress(localAddress);
Member self = ServerMemberManager.this.self;
self.setIp(event.getNewIp());
String oldAddress = event.getOldIp() + ":" + port;
ServerMemberManager.this.serverList.remove(oldAddress);
ServerMemberManager.this.serverList.put(newAddress, self);
ServerMemberManager.this.memberAddressInfos.remove(oldAddress);
ServerMemberManager.this.memberAddressInfos.add(newAddress);
}
@Override
public Class<? extends Event> subscribeType() {
return InetUtils.IPChangeEvent.class;
}
});
}
首先注册MembersChangeEvent事件
然后订阅IPChangeEvent事件。主要就是更新self、serverList和memberAddressInfos。
initAndStartLookup方法:
private void initAndStartLookup() throws NacosException {
this.lookup = LookupFactory.createLookUp(this);
this.lookup.start();
}
通过工厂创建一个lookup,然后start。这里的lookup的实现主要有AddressServerMemberLookup(通过地址服务)、FileConfigMemberLookup(通过文件配置)、StandaloneMemberLookup(单机模式)。我们看一下FileConfigMemberLookup的start,另外两个有兴趣自行查看:
@Override
public void start() throws NacosException {
if (start.compareAndSet(false, true)) {
readClusterConfFromDisk();
// Use the inotify mechanism to monitor file changes and automatically
// trigger the reading of cluster.conf
try {
WatchFileCenter.registerWatcher(ApplicationUtils.getConfFilePath(), watcher);
} catch (Throwable e) {
Loggers.CLUSTER.error("An exception occurred in the launch file monitor : {}", e.getMessage());
}
}
}
首先从文件中读出集群信息,然后添加一个监听器,用于监听文件变化。
private void readClusterConfFromDisk() {
Collection<Member> tmpMembers = new ArrayList<>();
try {
List<String> tmp = ApplicationUtils.readClusterConf();
tmpMembers = MemberUtils.readServerConf(tmp);
} catch (Throwable e) {
Loggers.CLUSTER
.error("nacos-XXXX [serverlist] failed to get serverlist from disk!, error : {}", e.getMessage());
}
afterLookup(tmpMembers);
}
@Override
public void afterLookup(Collection<Member> members) {
this.memberManager.memberChange(members);
}
读出来后调用afterLookup,实际上就是ServerMemberManager的memberChange方法:
synchronized boolean memberChange(Collection<Member> members) {
if (members == null || members.isEmpty()) {
return false;
}
boolean isContainSelfIp = members.stream()
.anyMatch(ipPortTmp -> Objects.equals(localAddress, ipPortTmp.getAddress()));
if (isContainSelfIp) {
isInIpList = true;
} else {
isInIpList = false;
members.add(this.self);
Loggers.CLUSTER.warn("[serverlist] self ip {} not in serverlist {}", self, members);
}
boolean hasChange = members.size() != serverList.size();
ConcurrentSkipListMap<String, Member> tmpMap = new ConcurrentSkipListMap<>();
Set<String> tmpAddressInfo = new ConcurrentHashSet<>();
for (Member member : members) {
final String address = member.getAddress();
if (!serverList.containsKey(address)) {
hasChange = true;
}
// Ensure that the node is created only once
tmpMap.put(address, member);
tmpAddressInfo.add(address);
}
serverList = tmpMap;
memberAddressInfos = tmpAddressInfo;
Collection<Member> finalMembers = allMembers();
Loggers.CLUSTER.warn("[serverlist] updated to : {}", finalMembers);
if (hasChange) {
MemberUtils.syncToFile(finalMembers);
Set<Member> healthMembers = MemberUtils.selectTargetMembers(members, member -> {
return !NodeState.DOWN.equals(member.getState());
});
Event event = MembersChangeEvent.builder().members(finalMembers).build();
NotifyCenter.publishEvent(event);
}
return hasChange;
}
就是初始化serverList和memberAddressInfos,如果有变更还会发布MembersChangeEvent事件。
@Override
public void onApplicationEvent(WebServerInitializedEvent event) {
getSelf().setState(NodeState.UP);
if (!ApplicationUtils.getStandaloneMode()) {
GlobalExecutor.scheduleByCommon(this.infoReportTask, 5_000L);
}
ApplicationUtils.setPort(event.getWebServer().getPort());
ApplicationUtils.setLocalAddress(this.localAddress);
Loggers.CLUSTER.info("This node is ready to provide external services");
}
设置self的状态为up。如果不是单机模式,延迟5秒启动一个任务infoReportTask。 我们看一下这个任务的定义:
@Override
protected void executeBody() {
List<Member> members = ServerMemberManager.this.allMembersWithoutSelf();
// 不包含自己的member集合
if (members.isEmpty()) {
return;
}
// 这里相当于一个游标,每次任务执行往下跳一个。
this.cursor = (this.cursor + 1) % members.size();
// 目标节点
Member target = members.get(cursor);
Loggers.CLUSTER.debug("report the metadata to the node : {}", target.getAddress());
// /nacos/v1/core/cluster/report
final String url = HttpUtils
.buildUrl(false, target.getAddress(), ApplicationUtils.getContextPath(), Commons.NACOS_CORE_CONTEXT,
"/cluster/report");
try {
asyncRestTemplate
.post(url, Header.newInstance().addParam(Constants.NACOS_SERVER_HEADER, VersionUtils.version),
Query.EMPTY, getSelf(), reference.getType(), new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (result.getCode() == HttpStatus.NOT_IMPLEMENTED.value()
|| result.getCode() == HttpStatus.NOT_FOUND.value()) {
Loggers.CLUSTER
.warn("{} version is too low, it is recommended to upgrade the version : {}",
target, VersionUtils.version);
return;
}
if (result.ok()) {
MemberUtils.onSuccess(target);
} else {
Loggers.CLUSTER
.warn("failed to report new info to target node : {}, result : {}",
target.getAddress(), result);
MemberUtils.onFail(target);
}
}
@Override
public void onError(Throwable throwable) {
Loggers.CLUSTER
.error("failed to report new info to target node : {}, error : {}",
target.getAddress(),
ExceptionUtil.getAllExceptionMsg(throwable));
MemberUtils.onFail(target, throwable);
}
@Override
public void onCancel() {
}
});
} catch (Throwable ex) {
Loggers.CLUSTER.error("failed to report new info to target node : {}, error : {}", target.getAddress(),
ExceptionUtil.getAllExceptionMsg(ex));
}
}
每次执行把自己self发送给对方。发送成功/失败会更新目标的状态。/nacos/v1/core/cluster/report
然后看一下after方法:
@Override
protected void after() {
GlobalExecutor.scheduleByCommon(this, 2_000L);
}
就是把任务丢回去,延迟2秒执行。
我们看一下接收端,NacosClusterController的report方法:
@PostMapping(value = {"/report"})
public RestResult<String> report(@RequestBody Member node) {
if (!node.check()) {
return RestResultUtils.failedWithMsg(400, "Node information is illegal");
}
LoggerUtils.printIfDebugEnabled(Loggers.CLUSTER, "node state report, receive info : {}", node);
node.setState(NodeState.UP);
node.setFailAccessCnt(0);
boolean result = memberManager.update(node);
return RestResultUtils.success(Boolean.toString(result));
}
然后是memberManager.update:
public boolean update(Member newMember) {
Loggers.CLUSTER.debug("member information update : {}", newMember);
String address = newMember.getAddress();
if (!serverList.containsKey(address)) {
return false;
}
serverList.computeIfPresent(address, (s, member) -> {
if (NodeState.DOWN.equals(newMember.getState())) {
memberAddressInfos.remove(newMember.getAddress());
}
if (!MemberUtils.fullEquals(newMember, member)) {
newMember.setExtendVal(MemberMetaDataConstants.LAST_REFRESH_TIME, System.currentTimeMillis());
MemberUtils.copy(newMember, member);
// member data changes and all listeners need to be notified
NotifyCenter.publishEvent(MembersChangeEvent.builder().members(allMembers()).build());
}
return member;
});
return true;
}
就是更新目标节点serverList中对应节点的状态。
ServerMemberManager初始化过程
RaftPeerSet是依赖ProtocolManager的创建的,而ProtocolManager的创建依赖ServerMemberManager(@DependsOn)。也就是说初始化RaftPeerSet时,集群服务列表已经加载完成了
先看一下它的初始化方法,@PostConstruct注解的init方法:
@PostConstruct
public void init() {
NotifyCenter.registerSubscriber(this);
changePeers(memberManager.allMembers());
}
注册MembersChangeEvent事件订阅者,更新peers
private void changePeers(Collection<Member> members) {
Map<String, RaftPeer> tmpPeers = new HashMap<>(members.size());
// 遍历各节点
for (Member member : members) {
final String address = member.getAddress();
// 如果已经有了,直接添加到临时的里面
if (peers.containsKey(address)) {
tmpPeers.put(address, peers.get(address));
continue;
}
// 创建一个新的peer
RaftPeer raftPeer = new RaftPeer();
raftPeer.ip = address;
// first time meet the local server:
if (ApplicationUtils.getLocalAddress().equals(address)) {
raftPeer.term.set(localTerm.get());
}
// 也放进去
tmpPeers.put(address, raftPeer);
}
// replace raft peer set:
peers = tmpPeers;
// ready了,可以进行投票了
ready = true;
Loggers.RAFT.info("raft peers changed: " + members);
}
关键是这个ready = true,当RaftPeer准备好后才能发起投票。
我们看一下RaftPeer:
public class RaftPeer {
public String ip;
public String voteFor;
// leader的任期标识,每次leader变更term+1
public AtomicLong term = new AtomicLong(0L);
// leader任期 0-15随机
public volatile long leaderDueMs = RandomUtils.nextLong(0, GlobalExecutor.LEADER_TIMEOUT_MS);
// 心跳任期 0-5随机
public volatile long heartbeatDueMs = RandomUtils.nextLong(0, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
public volatile State state = State.FOLLOWER;
// 重置leader任期 15+0~5 即 15<leaderDueMs<20
public void resetLeaderDue() {
leaderDueMs = GlobalExecutor.LEADER_TIMEOUT_MS + RandomUtils.nextLong(0, GlobalExecutor.RANDOM_MS);
}
// 重置心跳任期 5秒
public void resetHeartbeatDue() {
heartbeatDueMs = GlobalExecutor.HEARTBEAT_INTERVAL_MS;
}
...
}
它监听了MembersChangeEvent事件,我们看一下onEvent方法:
@Override
public void onEvent(MembersChangeEvent event) {
Collection<Member> members = event.getMembers();
if (oldMembers == null) {
oldMembers = new HashSet<>(members);
} else {
oldMembers.removeAll(members);
}
if (!oldMembers.isEmpty()) {
changePeers(members);
}
oldMembers.clear();
oldMembers.addAll(members);
}
事件传过来的members从oldMembers里移除,如果此时oldMembers非空,说明老的和新的存在差异,更新peers, 最后oldMembers也更新。
RaftPeerSet主要是peers列表的准备工作,当准备完成后设置ready=true, 此时RaftCore中的任务会根据这个状态来决定是否开始进行选举、心跳等。
@PostConstruct注解的init方法:
@PostConstruct
public void init() throws Exception {
Loggers.RAFT.info("initializing Raft sub-system");
// 启动notifier
executor.submit(notifier);
final long start = System.currentTimeMillis();
// 加载数据
raftStore.loadDatums(notifier, datums);
// 设置任期 term
setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));
Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm());
// 没有任务才会往下执行
while (true) {
if (notifier.tasks.size() <= 0) {
break;
}
Thread.sleep(1000L);
}
initialized = true;
Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));
// 选举任务 500毫秒
GlobalExecutor.registerMasterElection(new MasterElection());
// 心跳任务 500毫秒
GlobalExecutor.registerHeartbeat(new HeartBeat());
Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",
GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
}
这里主要关注选举任务和心跳任务。这两个任务的周期是有点区别的,MasterElection是固定500毫秒执行一次。HeartBeat是固定延迟500毫秒执行一次,即每次执行完等500毫秒。
@Override
public void run() {
try {
// ready状态
if (!peers.isReady()) {
return;
}
//获取本地结点
RaftPeer local = peers.local();
// 任期-500毫秒
local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;
//任期超时没到
if (local.leaderDueMs > 0) {
return;
}
// 重置任期超时时间和心跳时间,准备开始拉票
// reset timeout
local.resetLeaderDue();
local.resetHeartbeatDue();
// 发起投票
sendVote();
} catch (Exception e) {
Loggers.RAFT.warn("[RAFT] error while master election {}", e);
}
}
首先判断是否ready,然后看看自己的任期有没有到,如果到了重置任期和心跳,然后开始发起投票。 具体投票过程后面我们再分析。
这里任期-500毫秒,就是一个任务的周期。
@Override
public void run() {
try {
if (!peers.isReady()) {
return;
}
RaftPeer local = peers.local();
// -500毫秒
local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
// 没有到,退出
if (local.heartbeatDueMs > 0) {
return;
}
//重置心跳
local.resetHeartbeatDue();
sendBeat();
} catch (Exception e) {
Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);
}
}
一样也是先判断是否ready。 然后看看心跳超时是否到了,如果到了,再发个心跳。具体sendBeat过程后面再分析。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/qq_19414183/article/details/112514557
内容来源于网络,如有侵权,请联系作者删除!