在上一篇博客中我们了解到,PullMessageService
线程主要是负责从pullRequestQueue
中获得拉取消息请求并进行请求处理的。
PullMessageService#run
//在拉取消息请求队列中拉取消息请求
PullRequest pullRequest = this.pullRequestQueue.take();
//处理请求
this.pullMessage(pullRequest);
但是pullRequestQueue
中的PullRequest
是从哪来的呢?是什么时候由谁进行填充的呢?
private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
通过pullRequestQueue
中的PullRequest
添加操作这个线索一步步跟踪下去,最后得出了pullRequestQueue
的调用链:
RebalanceService#run
↓
MQClientInstance#doRebalance
↓
DefaultMQPullConsumerImpl#doRebalance
↓
RebalanceImpl#doRebalance
↓
RebalanceImpl#rebalanceByTopic
↓
RebalanceImpl#updateProcessQueueTableInRebalance
↓
RebalancePushImpl#dispatchPullRequest
↓
DefaultMQPushConsumerImpl#executePullRequestImmediately
↓
PullMessageService#executePullRequestImmediately
由上面的调用链我们可以看到,向PullMessageService
中的LinkedBlockingQueue<PullRequest>
添加拉取消息请求的是RebalanceService#run
,接下来我们对这个源头RebalanceService
进行解析。
RebalanceService
public class RebalanceService extends ServiceThread {
//等待时间
private static long waitInterval =
Long.parseLong(System.getProperty(
"rocketmq.client.rebalance.waitInterval", "20000"));
private final InternalLogger log = ClientLogger.getLog();
//消息客户端
private final MQClientInstance mqClientFactory;
public RebalanceService(MQClientInstance mqClientFactory) {
this.mqClientFactory = mqClientFactory;
}
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
//进入mqClientFactory
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
}
RebalanceService是一个服务线程,其run方法主要是调用MQClientInstance#doRebalance进行重新负载。
MQClientInstance
private final RebalanceService rebalanceService;
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
...
// Start rebalance service
//负载均衡服务启动
this.rebalanceService.start();
}
}
}
MQClientInstance持有一个RebalanceService线程,在start方法中开启该线程。
MQClientInstance#doRebalance
//循环遍历每个消费者组中的MQConsumerInner(即DefaultMQPush<Pull>ConsumerImpl)并调用其doRebalance
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
DefaultMQPushConsumerImpl#doRebalance
@Override
public void doRebalance() {
if (!this.pause) {
this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
}
}
经过多层的对象委托,终于来到实现消息负载分发的核心。
RebalanceImpl
//消息处理队列
protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
//Topic的队列信息
protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =
new ConcurrentHashMap<String, Set<MessageQueue>>();
//Topic订阅信息
protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
new ConcurrentHashMap<String, SubscriptionData>();
//消费者组
protected String consumerGroup;
//消费模式
protected MessageModel messageModel;
//队列分配策略
protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
//MQ客户端
protected MQClientInstance mQClientFactory;
RebalanceImpl#doRebalance
/** * 遍历订阅消息对每个主题的订阅的队列进行重新负载 * @param isOrder 是否是顺序消息 */
public void doRebalance(final boolean isOrder) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
//根据Topic来对队列进行重新负载
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
//如果消息队列的Topic不在订阅的主题中-删除该消息队列
this.truncateMessageQueueNotMyTopic();
}
RebalanceImpl#rebalanceByTopic
//从主题订阅消息缓存表中获取主题的队列信息
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//查找该主题订阅组所有的消费者ID
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
//消费模式
switch (messageModel) {
case BROADCASTING: {
...
break;
}
case CLUSTERING: {
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}
//对主题的消息队列和消费者ID进行排序
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
Collections.sort(mqAll);
Collections.sort(cidAll);
//获取当前负载均衡策略
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
//根据策略对消息队列进行重新分配
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
//重新负载后-对消息消费队列进行更新-返回消息队列负载是否发生变化
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
Rebalance#updateProcessQueueTableInRebalance
boolean changed = false; //消息队列负载是否发生变化
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
//遍历<消息队列,处理队列>缓存表
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
//如果消息队列不在该Topic处理范围内
if (mq.getTopic().equals(topic)) {
//消息队列已经被分配到其他消费者去消费了-不包含在当前主题的Set<MessageQueue>中
if (!mqSet.contains(mq)) {
//private volatile boolean dropped;
//设置当前处理队列为被丢弃-及时阻止继续向该消息处理队列进行消息拉取
pq.setDropped(true);
//判断是否需要移除
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
//发生变化
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.isPullExpired()) {
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
}
}
}
RebalancePushImpl#removeUnnecessaryMessageQueue
//丢弃消息队列之前先将消息队列进行持久化
//保存在本地(LocalFileOffsetStore)/消息服务器Broker(RemoteBrokerOffsetStore)
this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
//顺序消费进入的分支
if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
&& MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
try {
if (pq.getConsumeLock().tryLock(1000, TimeUnit.MILLISECONDS)) {
try {
return this.unlockDelay(mq, pq);
} finally {
pq.getConsumeLock().unlock();
}
} else {
log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}",
mq,
pq.getTryUnlockTimes());
pq.incTryUnlockTimes();
}
} catch (Exception e) {
log.error("removeUnnecessaryMessageQueue Exception", e);
}
return false;
}
//暂时只看非顺序消息-返回true
return true;
RebalanceImpl#updateProcessQueueTableInRebalance
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
//遍历消息队列
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = -1L;
try {
//根据不同的消息消费策略获取下一次消费的偏移量
//CONSUME_FROM_LAST_OFFSET/CONSUME_FROM_FIRST_OFFSET/CONSUME_FROM_TIMESTAMP
nextOffset = this.computePullFromWhereWithException(mq);
} catch (Exception e) {
log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
continue;
}
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
//消息队列已经存在
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
//消息队列不存在-新添加
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
//封装拉取请求PullRequest
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
//放入拉取请求列表
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
//分发消息拉取请求
this.dispatchPullRequest(pullRequestList);
return changed;
RebalancePushImpl#dispatchPullRequest
@Override
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
//遍历请求列表
for (PullRequest pullRequest : pullRequestList) {
//立刻拉取消息
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
}
}
DefaultMQPushConsumerImpl#executePullRequestImmediately
public void executePullRequestImmediately(final PullRequest pullRequest) {
//将请求丢入PullMessageService线程中
this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
}
PullMessageService
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
//放入消息拉取请求队列中
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}
本文主要解析了消息消费端的负载机制,首先RebalanceService线程启动,为消息消费者分发消息队列,每一个MessageQueue消息队列都回构建一个PullRequest,通过将这个PullRequest放入PullMessageService中的pullRequestQueue,进而唤醒PullMessageService#run,在pullRequestQueue中获得拉取消息请求并进行处理。从上一篇的的消息拉取分析中我们可以得知,接下来执行DefaultMQPushConsumerImpl#pullMessage,通过网络远程调用从Broker中拉取消息,一次最多拉取消息数量默认为32条,然后Broker将拉取的消息进行过滤并封装后返回。返回之后再回到消息消费端,将消费任务提交到消费者的ConsumerMessageService执行消息的消费。
本文仅作为个人学习使用,如有不足或错误请指正!
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/TaylorSwiftiiln/article/details/121430267
内容来源于网络,如有侵权,请联系作者删除!