rocketmq默认发送的消息是进入多个消息队列,然后消费端多线程并发消费,所以默认情况,不是顺序消费消息的;
有时候,我们需要实现顺序消费一批消息,比如电商系统,订单创建,支付,完成等操作,需要顺序执行;
RocketMQTemplate
给我们提供了SendOrderly方法(有多个重载),来实现发送顺序消息;包括以下:
syncSendOrderly,发送同步顺序消息;
asyncSendOrderly,发送异步顺序消息;
sendOneWayOrderly,发送单向顺序消息;
一般我们用第一种发送同步顺序消息;
第三个参数hashKey
,方法点进去:
因为broker会管理多个消息队列,这个hashKey参数,主要用来计算选择队列的,一般可以把订单ID,产品ID作为参数值;
发送到一个队列,这样方便搞顺序队列;
以及消费端接收的时候,默认是并发多线程去接收消息。RocketMQMessageListener有个属性consumeMode,默认是ConsumeMode.CONCURRENTLY ,我们要改成ConsumeMode.ORDERLY,单线程顺序接收消息;
下面给具体事例,模拟两个订单发送消息;
消息生产者端:
/** * 发送同步顺序消息 */
public void sendOrderlyMessage(){
// hashKey用来计算决定消息发送到哪个消息队列 一般是订单ID,产品ID等
rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456231,创建", "98456231");
rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456231,支付", "98456231");
rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456231,完成", "98456231");
rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456232,创建", "98456232");
rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456232,支付", "98456232");
rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456232,完成", "98456232");
}
消费者端:
/** * 消息消费者 * @author java1234_小锋 * @site www.java1234.com * @company 南通小锋网络科技有限公司 * @create 2021-08-22 22:40 */
@RocketMQMessageListener(topic = "java1234-rocketmq-orderly",consumerGroup ="${rocketmq.consumer.group}",consumeMode =ConsumeMode.ORDERLY )
@Component
public class ConsumerService implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("消费者:收到消息内容:"+s);
}
}
运行测试:
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/caoli201314/article/details/120218194
内容来源于网络,如有侵权,请联系作者删除!