9 SpringBoot整合RocketMQ实现顺序消息

x33g5p2x  于2021-09-19 转载在 Spring  
字(1.7k)|赞(0)|评价(0)|浏览(924)

rocketmq默认发送的消息是进入多个消息队列,然后消费端多线程并发消费,所以默认情况,不是顺序消费消息的;

有时候,我们需要实现顺序消费一批消息,比如电商系统,订单创建,支付,完成等操作,需要顺序执行;

RocketMQTemplate给我们提供了SendOrderly方法(有多个重载),来实现发送顺序消息;包括以下:

syncSendOrderly,发送同步顺序消息;

asyncSendOrderly,发送异步顺序消息;

sendOneWayOrderly,发送单向顺序消息;

一般我们用第一种发送同步顺序消息;

第三个参数hashKey,方法点进去:

因为broker会管理多个消息队列,这个hashKey参数,主要用来计算选择队列的,一般可以把订单ID,产品ID作为参数值;

发送到一个队列,这样方便搞顺序队列;

以及消费端接收的时候,默认是并发多线程去接收消息。RocketMQMessageListener有个属性consumeMode,默认是ConsumeMode.CONCURRENTLY ,我们要改成ConsumeMode.ORDERLY,单线程顺序接收消息;

下面给具体事例,模拟两个订单发送消息;

消息生产者端:

/** * 发送同步顺序消息 */
public void sendOrderlyMessage(){
	// hashKey用来计算决定消息发送到哪个消息队列 一般是订单ID,产品ID等
	rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456231,创建", "98456231");
	rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456231,支付", "98456231");
	rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456231,完成", "98456231");

	rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456232,创建", "98456232");
	rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456232,支付", "98456232");
	rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456232,完成", "98456232");
}

消费者端:

/** * 消息消费者 * @author java1234_小锋 * @site www.java1234.com * @company 南通小锋网络科技有限公司 * @create 2021-08-22 22:40 */
@RocketMQMessageListener(topic = "java1234-rocketmq-orderly",consumerGroup ="${rocketmq.consumer.group}",consumeMode =ConsumeMode.ORDERLY )
@Component
public class ConsumerService implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        System.out.println("消费者:收到消息内容:"+s);
    }

}

运行测试:

相关文章