RocketMQ:消费端的消息消息队列负载均衡与重新发布机制源码解析

x33g5p2x  于2021-11-22 转载在 其他  
字(10.0k)|赞(0)|评价(0)|浏览(313)

前言

在上一篇博客中我们了解到,PullMessageService线程主要是负责从pullRequestQueue中获得拉取消息请求并进行请求处理的。

PullMessageService#run

//在拉取消息请求队列中拉取消息请求
PullRequest pullRequest = this.pullRequestQueue.take();
//处理请求
this.pullMessage(pullRequest);

但是pullRequestQueue中的PullRequest是从哪来的呢?是什么时候由谁进行填充的呢?

private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();

流程解析

通过pullRequestQueue中的PullRequest添加操作这个线索一步步跟踪下去,最后得出了pullRequestQueue的调用链:

RebalanceService#run
		↓
MQClientInstance#doRebalance
		↓
DefaultMQPullConsumerImpl#doRebalance
		↓
RebalanceImpl#doRebalance
		↓
RebalanceImpl#rebalanceByTopic
		↓
RebalanceImpl#updateProcessQueueTableInRebalance
		↓
RebalancePushImpl#dispatchPullRequest
		↓
DefaultMQPushConsumerImpl#executePullRequestImmediately
		↓
PullMessageService#executePullRequestImmediately

由上面的调用链我们可以看到,向PullMessageService中的LinkedBlockingQueue<PullRequest>添加拉取消息请求的是RebalanceService#run,接下来我们对这个源头RebalanceService进行解析。

RebalanceService

public class RebalanceService extends ServiceThread {
    //等待时间
    private static long waitInterval =
        Long.parseLong(System.getProperty(
            "rocketmq.client.rebalance.waitInterval", "20000"));
    private final InternalLogger log = ClientLogger.getLog();
    //消息客户端
    private final MQClientInstance mqClientFactory;

    public RebalanceService(MQClientInstance mqClientFactory) {
        this.mqClientFactory = mqClientFactory;
    }

    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            this.waitForRunning(waitInterval);
            //进入mqClientFactory
            this.mqClientFactory.doRebalance();
        }

        log.info(this.getServiceName() + " service end");
    }
}

RebalanceService是一个服务线程,其run方法主要是调用MQClientInstance#doRebalance进行重新负载。

MQClientInstance

private final RebalanceService rebalanceService;

public void start() throws MQClientException {
    synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    ...
                    // Start rebalance service
                    //负载均衡服务启动
                    this.rebalanceService.start();

            }
        }
}

MQClientInstance持有一个RebalanceService线程,在start方法中开启该线程。

MQClientInstance#doRebalance

//循环遍历每个消费者组中的MQConsumerInner(即DefaultMQPush<Pull>ConsumerImpl)并调用其doRebalance
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
    MQConsumerInner impl = entry.getValue();
    if (impl != null) {
        try {
            impl.doRebalance();
        } catch (Throwable e) {
            log.error("doRebalance exception", e);
        }
    }
}

DefaultMQPushConsumerImpl#doRebalance

@Override
public void doRebalance() {
    if (!this.pause) {
        this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
    }
}

经过多层的对象委托,终于来到实现消息负载分发的核心。

RebalanceImpl

//消息处理队列
protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
//Topic的队列信息
protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =
    new ConcurrentHashMap<String, Set<MessageQueue>>();
//Topic订阅信息
protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
    new ConcurrentHashMap<String, SubscriptionData>();
//消费者组
protected String consumerGroup;
//消费模式
protected MessageModel messageModel;
//队列分配策略
protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
//MQ客户端
protected MQClientInstance mQClientFactory;

RebalanceImpl#doRebalance

/** * 遍历订阅消息对每个主题的订阅的队列进行重新负载 * @param isOrder 是否是顺序消息 */
public void doRebalance(final boolean isOrder) {
    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
    if (subTable != null) {
        for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
            final String topic = entry.getKey();
            try {
                //根据Topic来对队列进行重新负载
                this.rebalanceByTopic(topic, isOrder);
            } catch (Throwable e) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("rebalanceByTopic Exception", e);
                }
            }
        }
    }
	//如果消息队列的Topic不在订阅的主题中-删除该消息队列
    this.truncateMessageQueueNotMyTopic();
}

RebalanceImpl#rebalanceByTopic

//从主题订阅消息缓存表中获取主题的队列信息
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//查找该主题订阅组所有的消费者ID
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
//消费模式
switch (messageModel) {
    case BROADCASTING: {
	    ...	 
        break;
    }
    case CLUSTERING: {
        if (null == mqSet) {
            if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
            }
        }

        if (null == cidAll) {
            log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
        }

        //对主题的消息队列和消费者ID进行排序
        if (mqSet != null && cidAll != null) {
            List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
            mqAll.addAll(mqSet);

            Collections.sort(mqAll);
            Collections.sort(cidAll);

            //获取当前负载均衡策略
            AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

            List<MessageQueue> allocateResult = null;
            try {
                //根据策略对消息队列进行重新分配
                allocateResult = strategy.allocate(
                    this.consumerGroup,
                    this.mQClientFactory.getClientId(),
                    mqAll,
                    cidAll);
            } catch (Throwable e) {
                log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                    e);
                return;
            }

            Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
            if (allocateResult != null) {
                allocateResultSet.addAll(allocateResult);
            }

            //重新负载后-对消息消费队列进行更新-返回消息队列负载是否发生变化
            boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
            if (changed) {
                log.info(
                    "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                    strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                    allocateResultSet.size(), allocateResultSet);
                this.messageQueueChanged(topic, mqSet, allocateResultSet);
            }
        }
        break;
    }
    default:
        break;
}

Rebalance#updateProcessQueueTableInRebalance

boolean changed = false;		//消息队列负载是否发生变化

Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
//遍历<消息队列,处理队列>缓存表
while (it.hasNext()) {
    Entry<MessageQueue, ProcessQueue> next = it.next();
    MessageQueue mq = next.getKey();
    ProcessQueue pq = next.getValue();

    //如果消息队列不在该Topic处理范围内
    if (mq.getTopic().equals(topic)) {
        //消息队列已经被分配到其他消费者去消费了-不包含在当前主题的Set<MessageQueue>中
        if (!mqSet.contains(mq)) {
            //private volatile boolean dropped;
            //设置当前处理队列为被丢弃-及时阻止继续向该消息处理队列进行消息拉取
            pq.setDropped(true);
            //判断是否需要移除
            if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                it.remove();
                //发生变化
                changed = true;
                log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
            }
        } else if (pq.isPullExpired()) {
            switch (this.consumeType()) {
                case CONSUME_ACTIVELY:
                    break;
                case CONSUME_PASSIVELY:
                    pq.setDropped(true);
                    if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                        it.remove();
                        changed = true;
                        log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
                            consumerGroup, mq);
                    }
                    break;
                default:
                    break;
            }
        }
    }
}

RebalancePushImpl#removeUnnecessaryMessageQueue

//丢弃消息队列之前先将消息队列进行持久化
//保存在本地(LocalFileOffsetStore)/消息服务器Broker(RemoteBrokerOffsetStore)
this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
//顺序消费进入的分支
if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
    && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
    try {
        if (pq.getConsumeLock().tryLock(1000, TimeUnit.MILLISECONDS)) {
            try {
                return this.unlockDelay(mq, pq);
            } finally {
                pq.getConsumeLock().unlock();
            }
        } else {
            log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}",
                mq,
                pq.getTryUnlockTimes());

            pq.incTryUnlockTimes();
        }
    } catch (Exception e) {
        log.error("removeUnnecessaryMessageQueue Exception", e);
    }

    return false;
}
//暂时只看非顺序消息-返回true
return true;

RebalanceImpl#updateProcessQueueTableInRebalance

List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
//遍历消息队列
for (MessageQueue mq : mqSet) {
    if (!this.processQueueTable.containsKey(mq)) {
        if (isOrder && !this.lock(mq)) {
            log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
            continue;
        }

        this.removeDirtyOffset(mq);
        ProcessQueue pq = new ProcessQueue();

        long nextOffset = -1L;
        try {
            //根据不同的消息消费策略获取下一次消费的偏移量
            //CONSUME_FROM_LAST_OFFSET/CONSUME_FROM_FIRST_OFFSET/CONSUME_FROM_TIMESTAMP
            nextOffset = this.computePullFromWhereWithException(mq);
        } catch (Exception e) {
            log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
            continue;
        }

        if (nextOffset >= 0) {
           
            ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
            //消息队列已经存在
            if (pre != null) {
                log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
            } else {
                //消息队列不存在-新添加
                log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                //封装拉取请求PullRequest
                PullRequest pullRequest = new PullRequest();
                pullRequest.setConsumerGroup(consumerGroup);
                pullRequest.setNextOffset(nextOffset);
                pullRequest.setMessageQueue(mq);
                pullRequest.setProcessQueue(pq);
                //放入拉取请求列表
                pullRequestList.add(pullRequest);
                changed = true;
            }
        } else {
            log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
        }
    }
}
//分发消息拉取请求
this.dispatchPullRequest(pullRequestList);

return changed;

RebalancePushImpl#dispatchPullRequest

@Override
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
    //遍历请求列表
    for (PullRequest pullRequest : pullRequestList) {
        //立刻拉取消息
        this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
        log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
    }
}

DefaultMQPushConsumerImpl#executePullRequestImmediately

public void executePullRequestImmediately(final PullRequest pullRequest) {
    //将请求丢入PullMessageService线程中
    this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
}

PullMessageService

public void executePullRequestImmediately(final PullRequest pullRequest) {
    try {
        //放入消息拉取请求队列中
        this.pullRequestQueue.put(pullRequest);
    } catch (InterruptedException e) {
        log.error("executePullRequestImmediately pullRequestQueue.put", e);
    }
}

总结

本文主要解析了消息消费端的负载机制,首先RebalanceService线程启动,为消息消费者分发消息队列,每一个MessageQueue消息队列都回构建一个PullRequest,通过将这个PullRequest放入PullMessageService中的pullRequestQueue,进而唤醒PullMessageService#run,在pullRequestQueue中获得拉取消息请求并进行处理。从上一篇的的消息拉取分析中我们可以得知,接下来执行DefaultMQPushConsumerImpl#pullMessage,通过网络远程调用从Broker中拉取消息,一次最多拉取消息数量默认为32条,然后Broker将拉取的消息进行过滤并封装后返回。返回之后再回到消息消费端,将消费任务提交到消费者的ConsumerMessageService执行消息的消费。

本文仅作为个人学习使用,如有不足或错误请指正!

相关文章