文章24 | 阅读 12808 | 点赞0
上篇在服务注册时我们注意到,每个Service实例化时会创建一个心跳检测任务ClientBeatCheckTask:
@Override
public void run() {
try {
if (!getDistroMapper().responsible(service.getName())) {
return;
}
if (!getSwitchDomain().isHealthCheckEnabled()) {
return;
}
// 当前service的所有实例
List<Instance> instances = service.allIPs(true);
// first set health status of instances:
for (Instance instance : instances) {
// 超过健康检查时间
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) {
if (instance.isHealthy()) {
instance.setHealthy(false);
Loggers.EVT_LOG
.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
instance.getIp(), instance.getPort(), instance.getClusterName(),
service.getName(), UtilsAndCommons.LOCALHOST_SITE,
instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
getPushService().serviceChanged(service);
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
}
}
}
}
if (!getGlobalConfig().isExpireInstance()) {
return;
}
// then remove obsolete instances:
for (Instance instance : instances) {
if (instance.isMarked()) {
continue;
}
// 超过删除检查时间
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
// delete instance
Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
JacksonUtils.toJson(instance));
deleteIp(instance);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}
}
当发现lastBeat时间过长后就会有相应处理。
那么当客户端发送来心跳消息时,我们只要能看到正常更新lastBeat和状态即可。
首先是心跳接收的入口方法,InstanceController的beat方法:
@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {
ObjectNode result = JacksonUtils.createEmptyJsonNode();
//设置要求的心跳间隔 默认5秒
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
RsInfo clientBeat = null;
//如果有心跳内容,也就不是轻量级心跳,转换为RsInfo
if (StringUtils.isNotBlank(beat)) {
clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
}
String clusterName = WebUtils
.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
if (clientBeat != null) {
if (StringUtils.isNotBlank(clientBeat.getCluster())) {
//获取集群名
clusterName = clientBeat.getCluster();
} else {
// fix #2533
clientBeat.setCluster(clusterName);
}
ip = clientBeat.getIp();
port = clientBeat.getPort();
}
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
//获取相关服务实例
Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
//实例不存在
if (instance == null) {
//如果心跳内容也没有就返回找不到
if (clientBeat == null) {
result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
return result;
}
Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
+ "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);
//否则根据心跳内容创建一个实例
instance = new Instance();
instance.setPort(clientBeat.getPort());
instance.setIp(clientBeat.getIp());
instance.setWeight(clientBeat.getWeight());
instance.setMetadata(clientBeat.getMetadata());
instance.setClusterName(clusterName);
instance.setServiceName(serviceName);
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(clientBeat.isEphemeral());
//注册实例
serviceManager.registerInstance(namespaceId, serviceName, instance);
}
Service service = serviceManager.getService(namespaceId, serviceName);
// 服务未找到
if (service == null) {
throw new NacosException(NacosException.SERVER_ERROR,
"service not found: " + serviceName + "@" + namespaceId);
}
//不存在的话,要创建一个进行处理
if (clientBeat == null) {
clientBeat = new RsInfo();
clientBeat.setIp(ip);
clientBeat.setPort(port);
clientBeat.setCluster(clusterName);
}
//开启一次性心跳检查任务
service.processClientBeat(clientBeat);
//成功返回,
result.put(CommonParams.CODE, NamingResponseCode.OK);
if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
//5秒间隔
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
}
//告诉客户端不需要带上心跳信息了,变成轻量级心跳了
result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
return result;
}
首先如果有beat信息的话,说明是第一次,这会带上服务实例信息,进行实例注册。
如果没有的话,则创建一个客户端心跳处理器,进行心跳数据处理。 service.processClientBeat(clientBeat):
public void processClientBeat(final RsInfo rsInfo) {
//创建一个临时心跳处理器,然后调度处理一次。
ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
clientBeatProcessor.setService(this);
clientBeatProcessor.setRsInfo(rsInfo);
HealthCheckReactor.scheduleNow(clientBeatProcessor);
}
我们看一下这个任务的run方法:
@Override
public void run() {
Service service = this.service;
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
}
// IP
String ip = rsInfo.getIp();
// 集群名字
String clusterName = rsInfo.getCluster();
// 端口
int port = rsInfo.getPort();
// 获取集群
Cluster cluster = service.getClusterMap().get(clusterName);
// 获取集群所有的临时服务实例
List<Instance> instances = cluster.allIPs(true);
// 遍历更新对应的状态
for (Instance instance : instances) {
// 如果IP和端口一致则更新
if (instance.getIp().equals(ip) && instance.getPort() == port) {
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
}
//刷新心跳时间
instance.setLastBeat(System.currentTimeMillis());
//没被标记的
if (!instance.isMarked()) {
//不健康的
if (!instance.isHealthy()) {
//设置为健康
instance.setHealthy(true);
Loggers.EVT_LOG
.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
cluster.getService().getName(), ip, port, cluster.getName(),
UtilsAndCommons.LOCALHOST_SITE);
//发送服务改变事件,PushService实现了 ApplicationListener<ServiceChangeEvent>,事件监听时发生UDP消息通知
getPushService().serviceChanged(service);
}
}
}
}
}
遍历所有的实例,刷新lastBeat的时间。如果没有被标记并且健康状态不对,则设置健康状态为true,同时发送ServiceChangeEvent事件。
关于PushService实现这个事件的监听,之前已经分析过了,就是UDP消息通知客户端。这里就不再分析了。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/qq_19414183/article/details/112465952
内容来源于网络,如有侵权,请联系作者删除!