生产者和消费者是消息队列的两个重要角色,生产者向消息队列写人数据, 消费者从消息队列里读取数据。本篇讲解两种类型的消费者,一个是 DefaultMQPushConsumer,由系统控制读取操作,收到消息后自动调用传人的 处理方法来处理;另 一个是 DefaultMQPullConsumer,读取操作中的大部分功 能由使用者自主控制 。
使用 DefaultMQPushConsumer 主要是设置好各种参数和传人处理消息的函数 。 系统收到消息后自动调用处理函数来处理消息,自动保存 Offset,而且加入新的 DefaultMQPushConsumer后会自动做负载均衡。
我们在安装目录下找到example项目的 package org.apache.rocketmq.example.quickstart;目录下源码介绍。
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
讲解源码
DefaultMQPushConsumer需要设置三 个 参数 : 一 是这个 Consumer 的 GroupName,二是 NameServer 的地址和端 口号,三是 Topic 的名称 ,下面将分 别进行详细介绍 。
(1) Consumer的 GroupName用于把多个 Consumer组织到一起, 提高并发 处理能力, GroupName需要和消息模式 (MessageModel)配合使用。
RocketMQ支持两种消息模式: Clustering和Broadcasting。
(2)NameServer 的地址和端口 号,可以填写多个 ,用分号隔开,达到消除单点故障的目的,比如 “ip1:port;ip2:port;ip3 :port” 。
(3)Topic名称用来标识消息类型, 需要提前创建。如果不需要消费某 个 Topic 下的所有消息,可以通过指定消息的 Tag 进行消息过滤,比如: Consumer.subscribe (”TopicTest”,’tag1 || tag2 || tag3”), 表示这个 Consumer要 消费“ TopicTest”下带有 tag1 或 tag2 或 tag3 的消息。在填写 Tag 参数的位置,用 null 或者“ *“ 表示要消费这个 Topic 的所有消息 。
处理流程
网上有很多文章讲处理流程的,个人觉得这个比较详细,https://blog.csdn.net/panxj856856/article/details/80776032把defaultMQPushConsumerImpl.start()方法按顺序都分析了。
不过我要说的重点是消息处理逻辑是在pullMessage这个函数的PullCallBack中。PullCallBack函数里有个 switch 语句,根据从 Broker 返回的消息类型做相应的 处理
switch (pullResult.getPullStatus ()) {
case FOUND:
……
break;
case NO_NEW_MSG
……
break;
case OFFSET_ILLEGAL :
……
break;
default:
break;
}
pullMessage函数的参数是 final PullRequest pullRequest ,这是通过“长轮询”方式达到 Push效果的方法,长轮询方式既有 Pull 的优点,又兼具 Push方式的实时性。
Push的方式是 Server端接收到消息后,主动把消息推送给 Client端,主动权在Server端,实时性高。用 Push方式主动推送有很多弊 端:首先是加大 Server 端的 工作量,进而影响 Server 的性能;其次,Client 的处理能力各不相同, Client 的状态不受 Server 控制,
Pull方式是 Client端循环地从 Server端拉取消息,主动权在 Client手里, 自己拉取到一定量消息后,处理妥当了再接着取。Pull 方式的问题是循环拉取 消息的间隔不好设定,间隔太短就处在一个 “忙等”的状态,浪费资源; Pull 的时间间隔太长 Server 端有消息到来时 有可能没有被及时处理。
下面就来介绍**“长轮询“**
“长轮询”方式通过 Client端和 Server端的配合,达到既拥有 Pull 的优 点,又能达到保证实时性的目的 。
大家只需记住长轮询就是在Broker在没有新消息的时候才阻塞,阻塞时间默认设置是 15秒,有消息会立刻返回
有兴趣的可以看一下长轮询源码在package org.apache.rocketmq.broker.longpolling.PullRequestHoldService.java的 run方法,里面是有三次Check,每次5s。
“长轮询”的 主动权还是 掌 握在 Consumer 手中, Broker 即使有大 量 消息积 压 ,也不会主动推 送给 Consumer 。
DefaultMQPushConsumer 的流量控制
上面我们分析得知PushConsumer的核心还是 Pull 方式。PushConsumer有个线程池 , 消息处理逻辑在各个线程里同时执行。
多线程处理业务是很麻烦的,所以RocketMQ定义了一个快照类 ProcessQueue来解决 堆积的数量 ?如何重复处理某些消息? 如何延迟处理某些消息? 等问题。每个 Message Queue 都会有个对应的 ProcessQueue 对象,保存了这个 Message Queue 消息处理状态的快照 。
ProcessQueue对象里主要的内容是一个 TreeMap 和一个读写锁。 TreeMap 里以 Message Queue 的 Offset作为 Key,以消息内容的引用为 Value,保存了 所有从 MessageQueue 获取到,但是还未被处理的消息; 读写 锁控制着多个线程对 TreeMap 对象的并发访 问 。
有 了 ProcessQueue 对象,流量控 制 就方便和灵活多了 ,客 户 端在每次 Pull请求前会做几个判断,分别取但还未处理的消息个数、消 息总大小、 Offset 的跨度,任何一个值超过设定的大小就隔一段时间再拉取消 息,从而达到流量控制的目的 。 此外 ProcessQueue 还可以辅助实现顺序消费的 逻辑。
有兴趣的可以翻看源码位置在org.apache.rocketmq.client.impl.consumer.pullMessage()
我们来看一下包中示例代码,位置在package org.apache.rocketmq.example.simple.PullConsumer;
public class PullConsumer {
private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");
for (MessageQueue mq : mqs) {
System.out.printf("Consume from the queue: %s%n", mq);
SINGLE_MQ:
while (true) {
try {
PullResult pullResult =
consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.printf("%s%n", pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if (offset != null)
return offset;
return 0;
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
OFFSE_TABLE.put(mq, offset);
}
}
讲解源码
处理 逻辑是逐个 读 取某 Topic 下所有 Message Queue 的内容, 读完一遍后退出, 主要处理额外的三件事情:
( 1 )获取 Message Queue 并遍历
一 个 Topic 包括多个 Message Queue,如果这个 Consumer 需要获取 Topic 下所有的消息,就 要遍历多有的 Message Queue。 如果有特殊情况,也可以选 择某些特定的 Message Queue 来读取消息 。
( 2 )维护 Offsetstore
从一个 Message Queue 里拉取消息的时候,要传人 Offset参数( long类型 的值),随着不断读取消息 , Offset会不断增长 。 这个时候由用户负责把 Offset 存储下来,根据具体情况可以存到内存里、写到磁盘或者数据库里等 。
( 3 )根据不同的消息状态做不同的处理
拉取消息的请求发出后,会返回: FOUND、 NO_MATCHED_MSG、 NO_NEW_MSG、 OFFSET_ILLEGAL 四种状态,需要根据每个状态做不同的处理 。比较重要的两个状态是 FOUNT 和 NO NEW MSG ,分别表示获取到消息和没 有新的消息 。
对于 PullConsumer来说,使用者主动权很高,可以根据实际需要暂停、停止、启动消费过程 。 需要注意的是 Offset 的保存,要在程序的异常处理部分增加把 Offset 写人磁盘方 面的处理,记准了每个 Message Queue 的 Offset,才能保证消息消 费 的准确性 。
DefaultMQPushConsumer 的退出, 要调用 shutdown() 函数, 以便 释放资 源、保存 Offset 等 。 这个调用要加到 Consumer 所在应用的退出逻辑中 。
启动 DefaultMQPushConsumer 时, NameServer 地址填错,程序仍然 可以正常启动,但是不会收到消息 。
解决启动时NameServer填写错误报错:
可以在 Consumer.start()语句后调用: Consumer.fetchSubscribeMessageQueues(”TopicName”),这 时如果配 置信息写得不准确,或者当 前服务不可 用,这个语句会报 MQClientException 异 常 。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_38003389/article/details/86658396
内容来源于网络,如有侵权,请联系作者删除!