RocketMQ完结篇,主要使用消息队列处理一些业务场景。
顺序消息分为全局顺序消息和部分顺序消息。
全局顺序消息指某个 Topic 下的 所有消息都要保证顺序;
部分顺序消息只要保证每一组消息被顺序消费即可,如订单消息的例子,只要保证 同一个订单 ID 的三个消息能按顺序消费 即可。
全局顺序消息
要保 证全局顺序消息, 需要 先把 Topic 的读写队列数设置为 一,然后 Producer 和 Consumer 的并发设置也要是一 。 简单来说,为了保证整个 Topic 的 全局消息有序,只能消除所有的并发处理,各部分都设置成单线程处理 。 这时 高并发、高吞吐量的功能完全用不上了 。
部分顺序消息
要保证部分消息有序,需要发送端和消费端配合处理 。 在发送端,要做到 把同一业务 ID 的消息发送到同一个 Message Queue ;在消费过程中,要做到从 同一个 Message Queue 读取的消息不被并发处理,这样才能达到部分有序 。
发送端使用 MessageQueueSelector类来控制 把消息发往哪个 Message Queue,如代码:
for (int i= 0; i <100; i++) {
int orderid = i ;
Message msg =new Message ("OrderTopic8”, tags,”KEY" + i ,
(”Hello RocketMQ " +orderid+” ” + i) .getBytes (RemotingHelper . DEFAULT_CHARSET)) ;
SendResult sendResult = Producer.send(msg, new MessageQueueSelector()
@Override
public MessageQueue select (List<MessageQueue> mqs , Message msg ,Object arg) {
System.out.println(”queue selector mq nums:”+mqs.size());
System.out.println (”msg info : ”+msg . toString()) ;
for (MessageQueue mq: mqs) {
System.out.println (mq.toString()) ;
}
Integer id = (Integer) arg ;
int index = id % mqs.size();
return mqs.get(index) ;
}
}, orderid );
System .out.println(sendResult)
}
RocketMQ 选 择了确保一定投递,保证消息不丢失,但有可能造成消息重复 。setRetryTimesWhenSendFailed, 设置在同步方式下自动重试的次数,默认值是 2,这样当第一次发送消息时, Broker端接收到了消息但 是没有正 确返回发送成功的状态,就造成了消息 重复。
消息重复一般情况下不会发生,但是如果消息量大,网络有波动,消息重 复就是个大概率事件。
解决消息重复有两种方法:
RocketMQ 是个先人先出的队列,不支持消息级别或者 Topic 级别的优先级 。可以通过间接的方式解决,下面列举三种优先级相 关需求的具体处理方法 。
第一种
多个不同的消息类型使用同一个topic时,由于某一个种消息流量非常大,导致其他类型的消息无法及时消费,造成不公平,所以把流量大的类型消息在一个单独的 Topic,其他类型消息在另外一个 Topic,应用程序创建两个 Consumer,分别订阅不同的 Topic,这样就可以了。
第二种
情况和第一种情况类似,但是不用创建大量的 Topic。举个实际应用场景: 一个订单处理系统,接收从 100家快递门店过来的请求,把这些请求 通过 Producer 写人 RocketMQ ;订单处理程序通过 Consumer 从队列里读取消 息并处理,每天最多处理 1 万单 。 如果这 100 个快递门店中某几个门店订单量 大增,比如门店一接了个大客户,一个上午就发出 2万单消息请求,这样其他 的 99 家门店可能被迫等待门店一的 2 万单处理完,也就是两天后订单才能被处 理,显然很不公平 。
这时可以创建 一 个 Topic, 设置 Topic 的 MessageQueue 数 量 超过 100 个,Producer根据订单的门店号,把每个门店的订单写人 一 个 MessageQueue。 DefaultMQPushConsumer默认是采用循环的方式逐个读取一个 Topic 的 所有 MessageQueue,这样如果某家门店订单 量 大增,这家门店对应的 MessageQueue 消息数增多,等待时间增长,但不会造成其他家门店 等 待时间 增长。
DefaultMQPushConsumer 默认的 pullBatchSize 是 32,也就是每次从某个 MessageQueue 读取消息的时候,最多可以读 32 个 。 在上面的场景中,为了更 加公平,可以把 pullBatchSize 设置成 1。
第三种
强制优先级
TypeA、 TypeB、 TypeC 三类消息 。 TypeA 处于第 一优先级,要确保只要有 TypeA消息,必须优先处理; TypeB处于第二优先 级; TypeC 处于第 三 优先级 。 对这种要求,或者逻辑更复杂的要求,就要用 户自己编码实现优先级控制,如果上述的 三 类消息在一个 Topic 里,可以使 用 PullConsumer,自主控制 MessageQueue 的遍历,以及消息的读取;如果 上述三类消息在三个 Topic下,需要启动三个 Consumer, 实现逻辑控制三个 Consumer 的消费 。
(1)提高消费并行度
在同一个 ConsumerGroup 下( Clustering 方式),可以通过增加 Consumer 实例的数量来提高并行度,通过加机器,或者在 已有机器中启动 多个 Consumer 进程都可以增加 Consumer实例数。注意总的 Consumer数量不要超过 Topic下 Read Queue 数量,超过的 Consumer 实例接收不到消息。
(2)以批量方式进行消费
多条消息同时处理的时间会大大小于逐个处理的时间总和,比如消费消息中涉及 update 某个数据库, 一次 update10条的时间会大大小于十次 update1条数据的时 间 。 这时可以通过批量方式消费来提高消费的吞吐 量 。
实现方法是设置 Consumer 的 consumeMessageBatchMaxSize 这个参数 ,默 认是 1,如果设置为 N,在消息多的时候每次收到的是个长度为 N的消息链表。
(3)检测延时情况,跳过非重要消息
public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
long Offset = msgs.get (0).getQueueOffset() ;
String maxOffset = msgs.get(0).getProperty(Message PROPERTY MAX OFFSET);
long d iff = Long.parseLong(maxOffset) - Offset;
if (diff > 90000) {
return ConsumeConcurrentlyStatus .CONSUME SUCCESS;
//正常消费消息
return ConsumeConcurrentlyStatus . CONSUME SUCCESS;
}
当某个队列的消息数堆积到 90000条以上,就直接丢弃,以 便快速追上发送消息的进度 。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_38003389/article/details/86686308
内容来源于网络,如有侵权,请联系作者删除!