文章11 | 阅读 6135 | 点赞0
楔子:有句老话在电视上大概已经听得生茧:我们不成功便成仁。最终是要完成任务。
RocketMQ 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。通过事务消息达到分布式事务的最终一致。
Apache RocketMQ 在 4.3.0 版中已经支持分布式事务消息,它采用了 2PC 的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息。如下图所示:
上图说明了事务消息的大致方案,其中可以分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。
一】事务消息发送及提交:
1)Producer 向 Broker 发送消息(half 消息)
2)服务端响应消息写入结果
3)Producer 根据发送结果执行本地事务(如果写入失败,此时 half 消息对业务不可见,本地逻辑不执行)
4)Producer 根据本地事务状态执行 Commit 或者 Rollback(Commit 操作生成消息索引,消息对消费者可见)
二】补偿流程:
5)对没有 Commit/Rollback 的事务消息(pending 状态的消息),从服务端发起一次“回查”
6)Producer 收到回查消息,检查回查消息对应的本地事务的状态
7)根据本地事务状态,重新 Commit 或者 Rollback
其中,补偿阶段用于解决消息 Commit 或者 Rollback 发生超时或者异常失败的情况。
事务消息共有三种状态,提交状态、回滚状态、中间状态:
package org.apache.rocketmq.client.producer;
public enum LocalTransactionState {
COMMIT_MESSAGE,
ROLLBACK_MESSAGE,
UNKNOW;
private LocalTransactionState() {
}
}
这里通过一个常用场景简单模拟 RocketMQ 的事务消息:写2个微服务,分别是订单服务和商品服务。订单服务进行下单处理,并发送消息给商品服务,商品服务对下单成功的商品进行扣减库存。
首先,写一个简易订单服务,使用 TransactionMQProducer 类创建生产者,并指定唯一的 ProducerGroup,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行状态回传。回传的三种事务状态如上所述。
package com.meiwei.service.mq.tcp.producer;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 订单服务
*/
public class OrderService {
// Topic 为 Message 所属的一级分类,就像学校里面的初中、高中
// Topic 名称长度不得超过 64 字符长度限制,否则会导致无法发送或者订阅
private static final String MQ_CONFIG_TOPIC = "TOPIC_MEIWEI_SMS_NOTICE_TEST";
// Tag 为 Message 所属的二级分类,比如初中可分为初一、初二、初三;高中可分为高一、高二、高三
private static final String MQ_CONFIG_TAG_PUSH = "PID_MEIWEI_SMS_ORDER_TRANSACTION";
public static void main(String[] args) throws Exception {
TransactionMQProducer producer = new TransactionMQProducer();
// 指定 NameServer 地址列表,多个nameServer地址用半角分号隔开。此处应改为实际 NameServer 地址
// NameServer 的地址必须有,但也可以通过启动参数指定、环境变量指定的方式设置,不一定要写死在代码里
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setProducerGroup("meiwei-producer-transaction-mq");
// 自定义线程池,执行事务操作
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("meiwei-order-service-transaction-msg-check");
return thread;
}
});
producer.setExecutorService(executor);
// 设置事务消息监听器
producer.setTransactionListener(new OrderTransactionListener());
// 在发送MQ消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
producer.start();
System.out.println("Order Server Start.");
// 模拟业务
for (int i = 0; i < 5; i++) {
String orderId = System.currentTimeMillis() + "";
String payOrder = "下单完成,订单编号:" + orderId;
Message message = new Message(MQ_CONFIG_TOPIC, MQ_CONFIG_TAG_PUSH, orderId, payOrder.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送事务消息
TransactionSendResult result = producer.sendMessageInTransaction(message, orderId);
System.out.println("【发送事务消息】发送结果:" + result);
Thread.sleep(100);
}
}
}
事务消息需要注册一个 TransactionListener,进行本地事务的执行和事务回查,代码如下:
package com.meiwei.service.mq.tcp.producer;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 订单事务消息监听器
*/
public class OrderTransactionListener implements TransactionListener {
private static final Map<String, Integer> statusMap = new ConcurrentHashMap<>();
// 执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
String orderId = (String) o;
// 记录本地事务执行结果
Integer status = this.executeTransactionResult(orderId);
System.out.println("【订单事务消息监听器】本地有新订单,orderId: " + orderId + ", result: " + status);
// 返回中间状态,需要检查消息队列来确定状态,即触发 checkLocalTransaction
return LocalTransactionState.UNKNOW;
}
// 检查本地事务状态,并回应消息队列的检查请求
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
Integer status = statusMap.get(messageExt.getKeys());
System.out.println("【订单事务消息监听器】执行事务消息回查,orderId: " + messageExt.getKeys() + ", result: " + status + ",时间: " + new Date());
if (null != status) {
switch (status) {
case 0:
// 中间状态,它代表需要检查消息队列来确定状态
return LocalTransactionState.UNKNOW;
case 1:
// 提交事务,它允许消费者消费此消息
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
// 回滚事务,它代表该消息将被删除,不允许被消费
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
// 模拟一个业务场景,并返回订单处理状态
private Integer executeTransactionResult(String orderId) {
Integer status = Math.toIntExact(Long.valueOf(orderId) % 3);
statusMap.put(orderId, status);
return status;
}
}
再次,写一个简易的商品服务,接收订单服务的事务消息,如果消息成功 commit,则进行本地扣减库存。
package com.meiwei.service.mq.tcp.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
/**
* 商品服务
*/
public class DrugsService {
// Topic 为 Message 所属的一级分类,就像学校里面的初中、高中
// Topic 名称长度不得超过 64 字符长度限制,否则会导致无法发送或者订阅
private static final String MQ_CONFIG_TOPIC = "TOPIC_MEIWEI_SMS_NOTICE_TEST";
// Tag 为 Message 所属的二级分类,比如初中可分为初一、初二、初三;高中可分为高一、高二、高三
private static final String MQ_CONFIG_TAG_PUSH = "PID_MEIWEI_SMS_ORDER_TRANSACTION";
public static void main(String[] args) throws Exception {
// 声明并初始化一个 consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
// 指定 NameServer 地址列表,多个nameServer地址用半角分号隔开。此处应改为实际 NameServer 地址
// NameServer 的地址必须有,但也可以通过启动参数指定、环境变量指定的方式设置,不一定要写死在代码里
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumerGroup("meiwei-consumer-transaction-mq");
// 设置 consumer 所订阅的 Topic 和 Tag,这里的 Topic 需要与生产者保持一致
consumer.subscribe(MQ_CONFIG_TOPIC, MQ_CONFIG_TAG_PUSH);
// 注册一个消息监听器消费消息
consumer.registerMessageListener(new DrugsMqConsumerListener());
consumer.start();
System.out.println("Drugs Service Start.");
}
}
package com.meiwei.service.mq.tcp.consumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
/**
* 商品消费端监听器
*/
public class DrugsMqConsumerListener implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
Optional.ofNullable(list).orElse(Collections.emptyList()).forEach(msg -> {
String orderId = msg.getKeys();
System.out.println("【商品消费端监听器】您有新订单,orderId: " + orderId + ",商品库存需要更新");
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
订单服务作为生产者发出新订单扣减库存消息:
Order Server Start.
【订单事务消息监听器】本地有新订单,orderId: 1571021207194, result: 1
【发送事务消息】发送结果:SendResult [sendStatus=SEND_OK, msgId=0A06341B156C18B4AAC24542D3BF0000, offsetMsgId=null, messageQueue=MessageQueue [topic=TOPIC_MEIWEI_SMS_NOTICE_TEST, brokerName=YYW-SH-PC-1454, queueId=1], queueOffset=1620]
【订单事务消息监听器】本地有新订单,orderId: 1571021207595, result: 0
【发送事务消息】发送结果:SendResult [sendStatus=SEND_OK, msgId=0A06341B156C18B4AAC24542D42B0001, offsetMsgId=null, messageQueue=MessageQueue [topic=TOPIC_MEIWEI_SMS_NOTICE_TEST, brokerName=YYW-SH-PC-1454, queueId=2], queueOffset=1621]
【订单事务消息监听器】本地有新订单,orderId: 1571021207696, result: 2
【发送事务消息】发送结果:SendResult [sendStatus=SEND_OK, msgId=0A06341B156C18B4AAC24542D4900002, offsetMsgId=null, messageQueue=MessageQueue [topic=TOPIC_MEIWEI_SMS_NOTICE_TEST, brokerName=YYW-SH-PC-1454, queueId=3], queueOffset=1622]
【订单事务消息监听器】本地有新订单,orderId: 1571021207798, result: 2
【发送事务消息】发送结果:SendResult [sendStatus=SEND_OK, msgId=0A06341B156C18B4AAC24542D4F60003, offsetMsgId=null, messageQueue=MessageQueue [topic=TOPIC_MEIWEI_SMS_NOTICE_TEST, brokerName=YYW-SH-PC-1454, queueId=0], queueOffset=1623]
【订单事务消息监听器】本地有新订单,orderId: 1571021207900, result: 2
【发送事务消息】发送结果:SendResult [sendStatus=SEND_OK, msgId=0A06341B156C18B4AAC24542D55C0004, offsetMsgId=null, messageQueue=MessageQueue [topic=TOPIC_MEIWEI_SMS_NOTICE_TEST, brokerName=YYW-SH-PC-1454, queueId=1], queueOffset=1624]
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207194, result: 1,时间: Mon Oct 14 10:46:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207595, result: 0,时间: Mon Oct 14 10:46:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207798, result: 2,时间: Mon Oct 14 10:46:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207696, result: 2,时间: Mon Oct 14 10:46:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207900, result: 2,时间: Mon Oct 14 10:46:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207595, result: 0,时间: Mon Oct 14 10:47:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207595, result: 0,时间: Mon Oct 14 10:48:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207595, result: 0,时间: Mon Oct 14 10:49:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207595, result: 0,时间: Mon Oct 14 10:50:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207595, result: 0,时间: Mon Oct 14 10:51:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207595, result: 0,时间: Mon Oct 14 10:52:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207595, result: 0,时间: Mon Oct 14 10:53:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207595, result: 0,时间: Mon Oct 14 10:54:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207595, result: 0,时间: Mon Oct 14 10:55:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207595, result: 0,时间: Mon Oct 14 10:56:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207595, result: 0,时间: Mon Oct 14 10:57:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207595, result: 0,时间: Mon Oct 14 10:58:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207595, result: 0,时间: Mon Oct 14 10:59:56 CST 2019
【订单事务消息监听器】执行事务消息回查,orderId: 1571021207595, result: 0,时间: Mon Oct 14 11:00:56 CST 2019
商品服务作为消费者监听并消费到扣减库存的消息,后面就可以进行扣减库存的业务处理:
从上面的生产者输出日志可以看到订单服务发出的 5 条消息:
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/itanping/article/details/101372336
内容来源于网络,如有侵权,请联系作者删除!