NameServer 维护这些配置信息 、 状态信 息,其他角色都通过 NameServer 来协同执行,这章我们就来分析NameServer以及RocketMQ都通讯机制。
NameServer简介
NameServer是整个消息队列中 的状态服务器,集群的各个组件通过它来了 解全局的信息 。 同时,各个角色的机器都要定期 向 NameServer上报自己的状 态,超 时不上报的 话, NameServer 会认为 某个机器出故障不可用了,其他的组 件会把这个机器从可用列表里移除 。
NamServer可以部署多个,相互之间独立,其他角色同时向多个 NameServer 机器上报状态信息,从而达到热备份的目的。 NameServer本身是无状态的,也就 是说 NameServer 中的 Broker、 Topic 等状态信息不会持久存储,都是由各个角色 定时上报并存储到内存中的。
集群状态的存储结构
** **代码位置package org.apache.rocketmq.namesrv.routeinfo; RoutelnfoManager类中,有五 个变量 ,集群的状态就保存在这五个变量中 。
NameServer 的主要工作就是维 护这五个 变量中存储的信 息 。
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
说明:topicQueueTable 这个结构的 Key 是 Topic 的名称,它存储了所有 Topic 的属性信息 。
Value 是个 QueueData 队列 , 队里的长度 等于这 个 Topic 数据存储的 MasterBroker的个数,
QueueData里存储着 Broker的名称、 读写queue的数量、 同步标识等。
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
说明:以 BrokerName 为 索 引 ,相 同 名 称的 Broker 可能存在多台机器, 一个 Master
和多个 Slave。 这个结构存储着一个 BrokerName 对应的属性信 息,包括所属的 Cluster 名称,
一 个 Master Broker 和多个 Slave Broker 的地址信息 。
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
说明:存储的是集群中 Cluster 的信息,结果很简单,就是一个 Cluster 名称对 应一个由 BrokerName组成的集合。
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
说明:这个结构和 BrokerAddrTable有关系,但是内容完全不同,这个结构的 Key 是 BrokerAddr,也就是对应着一台 机器,
BrokerAddrTable 中的 Key 是BrokerName, 多个机器的BrokerName可以相同。 BrokerLiveTable 存储的内容是这台
Broker机器的实时状态,包括上次更新状态的时间 戳, NameServer会定期检查这个时间戳,超时没有更新就认为这个 Broker无效了,
将其从 Broker列表里清除。
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
说明:Filter Server是过滤服务器,是 RocketMQ 的一种服务端过滤方式,一 个 Broker 可以有 一个 或 多个 Filter Server。
这个结构的 Key 是 Broker 的地址, Value 是和这个 Broker关联的多个 Filter Server 的地址 。
状态维护逻辑
因为其他角色会主动向 NameServer上报状态,所以 NameServer 的主 要逻 辑在 DefaultRequest Processor类中,根据上报消息里的请求码做相 应 的处理, 更新存 储的对应 信息 。 此外,连接断开的 事 件也 会 触发状态 更新。
当NameServer和Broker的长连接断掉以后,NameServer会把这个 Broker的信息清理出去。
NameServer还有定时检查时间戳的逻辑, Broker向 NameServer发送的心 跳会更新时间戳, 当 NameServer检查到时间戳长时间没有更新后,便会触发 清理逻辑(10秒检查一次,时间戳超过 2分钟则认为 Broker已 失效。)。
Remoting 模块
RocketMQ 的通信相 关代码在 Remoting 模块里,先来 看看主要类结 构
RemotingService 为最上层接口,定义了 三个方法:
RemotingClient和 RemotingServer继承 RemotingService接口,并增加 了 自己特有的方法
NettyRemotingClient和NettyRemotingServer分 别实现了 RemotingCIient和RemotingServer,而且都继承了NettyRemotingAbstract 类 。
public class ClientRemotingProcessor implements NettyRequestProcessor {
private final Logger log = ClientLogger.getLog();
private final MQClientInstance mqClientFactory;
public ClientRemotingProcessor(final MQClientInstance mqClientFactory) {
this.mqClientFactory = mqClientFactory;
}
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.CHECK_TRANSACTION_STATE:
return this.checkTransactionState(ctx, request);
case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED:
return this.notifyConsumerIdsChanged(ctx, request);
case RequestCode.RESET_CONSUMER_CLIENT_OFFSET:
return this.resetOffset(ctx, request);
case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT:
return this.getConsumeStatus(ctx, request);
case RequestCode.GET_CONSUMER_RUNNING_INFO:
return this.getConsumerRunningInfo(ctx, request);
case RequestCode.CONSUME_MESSAGE_DIRECTLY:
return this.consumeMessageDirectly(ctx, request);
default:
break;
}
return null;
}
RocketMQ 中复杂的通信过程,被 RemotingCommand统一起来,大部分的逻辑都是通 过发送、接受并处理 Command 来完成的 。
协议设计和编解码
RocketMQ 自己定义了一个通信协议,使得模块间传输的二进制消息和有 意义的内容之间互相转换
Netty 库
RocketMQ是基于Netty库来完成RemotingServer和RemotingClient具体的 通信实现的, Netty是个事件驱动的网络编程框架,它屏蔽了 Java Socket、 NIO 等复杂细节,用户只需用好 Netty,就可以实现一个“网络编程专家+并发编程 专家”水平的 Server、 Client 网络程序 。 应用 Netty 有一定的门槛,需要了解它 的 EventLoopGroup、 Channel、 Handler 模型以及各种具体的配置。 RocketMQ 利用 Netty 实现的通信类是 NettyRemotingServer 和 NettyRemotingClient,用户 也可以参考这两个类的实现来学习使用 Netty。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_38003389/article/details/86677175
内容来源于网络,如有侵权,请联系作者删除!