【RocketMq实战第七篇】-NameServer

x33g5p2x  于2021-12-19 转载在 其他  
字(3.7k)|赞(0)|评价(0)|浏览(333)

前言

NameServer 维护这些配置信息 、 状态信 息,其他角色都通过 NameServer 来协同执行,这章我们就来分析NameServer以及RocketMQ都通讯机制。

正文

NameServer简介

NameServer是整个消息队列中 的状态服务器,集群的各个组件通过它来了 解全局的信息 。 同时,各个角色的机器都要定期 向 NameServer上报自己的状 态,超 时不上报的 话, NameServer 会认为 某个机器出故障不可用了,其他的组 件会把这个机器从可用列表里移除 。

NamServer可以部署多个,相互之间独立,其他角色同时向多个 NameServer 机器上报状态信息,从而达到热备份的目的。 NameServer本身是无状态的,也就 是说 NameServer 中的 Broker、 Topic 等状态信息不会持久存储,都是由各个角色 定时上报并存储到内存中的。

集群状态的存储结构

** **代码位置package org.apache.rocketmq.namesrv.routeinfo; RoutelnfoManager类中,有五 个变量 ,集群的状态就保存在这五个变量中 。 

NameServer 的主要工作就是维 护这五个 变量中存储的信 息 。

private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
说明:topicQueueTable 这个结构的 Key 是 Topic 的名称,它存储了所有 Topic 的属性信息 。
 Value 是个 QueueData 队列 , 队里的长度 等于这 个 Topic 数据存储的 MasterBroker的个数, 
QueueData里存储着 Broker的名称、 读写queue的数量、 同步标识等。

private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
说明:以 BrokerName 为 索 引 ,相 同 名 称的 Broker 可能存在多台机器, 一个 Master 
和多个 Slave。 这个结构存储着一个 BrokerName 对应的属性信 息,包括所属的 Cluster 名称,
 一 个 Master Broker 和多个 Slave Broker 的地址信息 。

private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
说明:存储的是集群中 Cluster 的信息,结果很简单,就是一个 Cluster 名称对 应一个由 BrokerName组成的集合。

private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
说明:这个结构和 BrokerAddrTable有关系,但是内容完全不同,这个结构的 Key 是 BrokerAddr,也就是对应着一台 机器, 
BrokerAddrTable 中的 Key 是BrokerName, 多个机器的BrokerName可以相同。 BrokerLiveTable 存储的内容是这台 
Broker机器的实时状态,包括上次更新状态的时间 戳, NameServer会定期检查这个时间戳,超时没有更新就认为这个 Broker无效了,
将其从 Broker列表里清除。

private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
说明:Filter Server是过滤服务器,是 RocketMQ 的一种服务端过滤方式,一 个 Broker 可以有 一个 或 多个 Filter Server。 
这个结构的 Key 是 Broker 的地址, Value 是和这个 Broker关联的多个 Filter Server 的地址 。

状态维护逻辑

因为其他角色会主动向 NameServer上报状态,所以 NameServer 的主 要逻 辑在 DefaultRequest­ Processor类中,根据上报消息里的请求码做相 应 的处理, 更新存 储的对应 信息 。 此外,连接断开的 事 件也 会 触发状态 更新。

当NameServer和Broker的长连接断掉以后,NameServer会把这个 Broker的信息清理出去。

NameServer还有定时检查时间戳的逻辑, Broker向 NameServer发送的心 跳会更新时间戳, 当 NameServer检查到时间戳长时间没有更新后,便会触发 清理逻辑(10秒检查一次,时间戳超过 2分钟则认为 Broker已 失效。)。

底层通信机制

Remoting 模块

RocketMQ 的通信相 关代码在 Remoting 模块里,先来 看看主要类结 构

RemotingService 为最上层接口,定义了 三个方法:

  • void start();
  • void shutdown();
  • void registerRPCHook(RPCHook rpcHook);

RemotingClient和 RemotingServer继承 RemotingService接口,并增加 了 自己特有的方法

NettyRemotingClient和NettyRemotingServer分 别实现了 RemotingCIient和RemotingServer,而且都继承了NettyRemotingAbstract 类 。

public class ClientRemotingProcessor implements NettyRequestProcessor {
    private final Logger log = ClientLogger.getLog();
    private final MQClientInstance mqClientFactory;

    public ClientRemotingProcessor(final MQClientInstance mqClientFactory) {
        this.mqClientFactory = mqClientFactory;
    }

    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        switch (request.getCode()) {
            case RequestCode.CHECK_TRANSACTION_STATE:
                return this.checkTransactionState(ctx, request);
            case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED:
                return this.notifyConsumerIdsChanged(ctx, request);
            case RequestCode.RESET_CONSUMER_CLIENT_OFFSET:
                return this.resetOffset(ctx, request);
            case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT:
                return this.getConsumeStatus(ctx, request);

            case RequestCode.GET_CONSUMER_RUNNING_INFO:
                return this.getConsumerRunningInfo(ctx, request);

            case RequestCode.CONSUME_MESSAGE_DIRECTLY:
                return this.consumeMessageDirectly(ctx, request);
            default:
                break;
        }
        return null;
    }

RocketMQ 中复杂的通信过程,被 RemotingCommand统一起来,大部分的逻辑都是通 过发送、接受并处理 Command 来完成的 。

协议设计和编解码

RocketMQ 自己定义了一个通信协议,使得模块间传输的二进制消息和有 意义的内容之间互相转换

  • 第一部分是大端 4个字节整数,值等于第二、三、 四部分长度的总和;
  • 第二部分是大端 4 个字节整数,值等于第 三部分的长度;
  • 第三部分是通过 Json序列化的数据;
  • 第四部分是通过应用自定义二进制序列化的数据 。

Netty 库

RocketMQ是基于Netty库来完成RemotingServer和RemotingClient具体的 通信实现的, Netty是个事件驱动的网络编程框架,它屏蔽了 Java Socket、 NIO 等复杂细节,用户只需用好 Netty,就可以实现一个“网络编程专家+并发编程 专家”水平的 Server、 Client 网络程序 。 应用 Netty 有一定的门槛,需要了解它 的 EventLoopGroup、 Channel、 Handler 模型以及各种具体的配置。 RocketMQ 利用 Netty 实现的通信类是 NettyRemotingServer 和 NettyRemotingClient,用户 也可以参考这两个类的实现来学习使用 Netty。

相关文章