项目源码: https://gitee.com/benwang6/seata-samples
TCC 与 Seata AT 事务一样都是两阶段事务,它与 AT 事务的主要区别为:
TCC 对业务代码侵入严重
每个阶段的数据操作都要自己进行编码来实现,事务框架无法自动处理。
TCC 效率更高
不必对数据加全局锁,允许多个事务同时操作数据。
以账户服务为例,当下订单时要扣减用户账户金额:
假如用户购买 100 元商品,要扣减 100 元。
TCC 事务首先对这100元的扣减金额进行预留,或者说是先冻结这100元:
如果第一阶段能够顺利完成,那么说明“扣减金额”业务(分支事务)最终肯定是可以成功的。当全局事务提交时, TC会控制当前分支事务进行提交,如果提交失败,TC 会反复尝试,直到提交成功为止。
当全局事务提交时,就可以使用冻结的金额来最终实现业务数据操作:
如果全局事务回滚,就把冻结的金额进行解冻,恢复到以前的状态,TC 会控制当前分支事务回滚,如果回滚失败,TC 会反复尝试,直到回滚完成为止。
多个TCC全局事务允许并发,它们执行扣减金额时,只需要冻结各自的金额即可:
Seata 支持 TCC 事务模式,与 AT 模式相同的,也需要以下组件来支持全局事务的控制:
下一节,我们还是以订单业务为例,来演示 Seata 如何实现 TCC 事务。
工程命名为 seata-tcc,存放到 seata-samples 文件夹下,与 seata-at 工程存放在一起:
无事务版本的4个文件夹,解压缩到工程目录
1.下载项目代码
2.访问 git 仓库
访问项目标签
下载无事务版
压缩文件中的 7 个项目目录解压缩到 seata-tcc 目录:
- project structure 中添加moudle
- double shift搜索add maven project
然后选择 seata-tcc 工程目录下的 7 个项目的 pom.xml 导入:
拖拽pom.xml文件,右键add as maven project
在订单项目中执行添加订单:
我们要添加以下 TCC 事务操作的代码:
Try - 第一阶,冻结数据阶段,向订单表直接插入订单,订单状态设置为0(冻结状态)。
Confirm - 第二阶段,提交事务,将订单状态修改成1(正常状态)。
Cancel - 第二阶段,回滚事务,删除订单。
打开 order-parent 中注释掉的 seata 依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>cn.tedu</groupId>
<artifactId>order-parent</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>pom</packaging>
<name>order-parent</name>
<modules>
<module>account</module>
<module>storage</module>
<module>order</module>
</modules>
<properties>
<mybatis-plus.version>3.3.2</mybatis-plus.version>
<druid-spring-boot-starter.version>1.1.23</druid-spring-boot-starter.version>
<seata.version>1.3.0</seata.version>
<spring-cloud-alibaba-seata.version>2.0.0.RELEASE</spring-cloud-alibaba-seata.version>
<spring-cloud.version>Hoxton.SR12</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>${druid-spring-boot-starter.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-seata</artifactId>
<version>${spring-cloud-alibaba-seata.version}</version>
<exclusions>
<exclusion>
<artifactId>seata-all</artifactId>
<groupId>io.seata</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>${seata.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
spring:
application:
name: order
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost/seata_order?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8
username: root
password: root
cloud:
alibaba:
seata:
tx-service-group: order_tx_group
server:
port: 8083
eureka:
client:
service-url:
defaultZone: http://localhost:8761/eureka
instance:
prefer-ip-address: true
instance-id: ${spring.cloud.client.ip-address}:${spring.application.name}:${server.port}
mybatis-plus:
type-aliases-package: cn.tedu.order.entity
mapper-locations:
- classpath:/mapper/*Mapper.xml configuration: map-underscore-to-camel-case: true logging: level: cn.tedu.order.mapper: DEBUG ribbon: MaxAutoRetriesNextServer: 0 #默认1
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "eureka"
nacos {
serverAddr = "localhost"
namespace = ""
cluster = "default"
}
eureka {
# 连接eureka,要从注册表发现 seata-server
serviceUrl = "http://localhost:8761/eureka"
# application = "default"
# weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = "0"
password = ""
cluster = "default"
timeout = "0"
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
username = ""
password = ""
}
consul {
cluster = "default"
serverAddr = "127.0.0.1:8500"
}
etcd3 {
cluster = "default"
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
application = "default"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
cluster = "default"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3、springCloudConfig
type = "file"
nacos {
serverAddr = "localhost"
namespace = ""
group = "SEATA_GROUP"
}
consul {
serverAddr = "127.0.0.1:8500"
}
apollo {
app.id = "seata-server"
apollo.meta = "http://192.168.1.204:8801"
namespace = "application"
}
zk {
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
username = ""
password = ""
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
name = "file.conf"
}
}
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
# the client batch send request enable
enableClientBatchSendRequest = true
#thread factory for netty
threadFactory {
bossThreadPrefix = "NettyBoss"
workerThreadPrefix = "NettyServerNIOWorker"
serverExecutorThread-prefix = "NettyServerBizHandler"
shareBossWorker = false
clientSelectorThreadPrefix = "NettyClientSelector"
clientSelectorThreadSize = 1
clientWorkerThreadPrefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
bossThreadSize = 1
#auto default pin or 8
workerThreadSize = "default"
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
#transaction service group mapping
# order_tx_group 与 yml 中的 “tx-service-group: order_tx_group” 配置一致
# “seata-server” 与 TC 服务器的注册名一致
# 从eureka获取seata-server的地址,再向seata-server注册自己,设置group
# order_tx_group 事务组,对应使用哪个协调器
# seata-server 是注册表中的服务id
vgroupMapping.order_tx_group = "seata-server"
#only support when registry.type=file, please don't set multiple addresses
order_tx_group.grouplist = "127.0.0.1:8091"
#degrade, current not support
enableDegrade = false
#disable seata
disableGlobalTransaction = false
}
client {
rm {
asyncCommitBufferLimit = 10000
lock {
retryInterval = 10
retryTimes = 30
retryPolicyBranchRollbackOnConflict = true
}
reportRetryCount = 5
tableMetaCheckEnable = false
reportSuccessEnable = false
}
tm {
commitRetryCount = 5
rollbackRetryCount = 5
}
undo {
dataValidation = true
logSerialization = "jackson"
logTable = "undo_log"
}
log {
exceptionRate = 100
}
}
根据前面的分析,订单数据操作有以下三项:
在 OrderMapper 中已经有插入订单的方法了,现在需要添加修改订单和删除订单的方法(删除方法从BaseMapper继承):
package cn.tedu.order.mapper;
import cn.tedu.order.entity.Order;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
public interface OrderMapper extends BaseMapper<Order> {
//创建正常订单
void create(Order order);
//创建冻结订单
void createFrozen(Order order);
//修订订单状态
void updateStatus(Long orderId,Integer status);
//取消回滚 删除订单,使用继承的方法deleteById()
}
那么对应的 OrderMapper.xml 中也要添加 sql:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="cn.tedu.order.mapper.OrderMapper" >
<resultMap id="BaseResultMap" type="cn.tedu.order.entity.Order" >
<id column="id" property="id" jdbcType="BIGINT" />
<result column="user_id" property="userId" jdbcType="BIGINT" />
<result column="product_id" property="productId" jdbcType="BIGINT" />
<result column="count" property="count" jdbcType="INTEGER" />
<result column="money" property="money" jdbcType="DECIMAL" />
<result column="status" property="status" jdbcType="INTEGER" />
</resultMap>
<!--关键字必须加反引号-->
<insert id="create">
INSERT INTO `order` (`id`,`user_id`,`product_id`,`count`,`money`,`status`)
VALUES(#{id}, #{userId}, #{productId}, #{count}, #{money},1);
</insert>
<!--创建冻结订单方法-->
<insert id="createFrozen">
INSERT INTO `order` (`id`,`user_id`,`product_id`,`count`,`money`,`status`)
VALUES(#{id}, #{userId}, #{productId}, #{count}, #{money},0);
</insert>
<!--修改订单状态-->
<update id="updateStatus">
update `order` set status = #{status}
where id = #{orderId};
</update>
<!--删除订单-->
<delete id="deleteById">
delete from `order` where id=#{orderId}
</delete>
</mapper>
第一阶段
Try - prepareCreateOrder()
第二阶段
Confirm - commit()
Cancel - rollback()
package cn.tedu.order.tcc;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
import java.math.BigDecimal;
@LocalTCC
public interface OrderTccAction {
/* 为了避开seata的一个bug,不使用封装对象 而是一个个的单独传递参数 */
@TwoPhaseBusinessAction(name = "OrderTccAction")
boolean prepare(BusinessActionContext ctx,
@BusinessActionContextParameter(paramName = "orderId") Long id,
Long userId,
Long productId,
Integer count,
BigDecimal money);
boolean commit(BusinessActionContext ctx);
boolean rollback(BusinessActionContext ctx);
}
实现类:
package cn.tedu.order.tcc;
import cn.tedu.order.entity.Order;
import cn.tedu.order.mapper.OrderMapper;
import io.seata.rm.tcc.api.BusinessActionContext;
import org.checkerframework.checker.units.qual.A;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.math.BigDecimal;
@Component
public class OrderTccActionImpl implements OrderTccAction{
@Autowired
private OrderMapper orderMapper;
@Transactional
@Override
public boolean prepare(BusinessActionContext ctx, Long id, Long userId, Long productId,Integer count, BigDecimal money) {
orderMapper.createFrozen(new Order(id,userId,productId,count,money,0));
//true表示成功
//false表示失败
return true;
}
@Transactional
@Override
public boolean commit(BusinessActionContext ctx) {
//
Long orderId = Long.valueOf(ctx.getActionContext("orderId").toString());
orderMapper.updateStatus(orderId, 1);
return true;
}
@Transactional
@Override
public boolean rollback(BusinessActionContext ctx) {
Long orderId = Long.valueOf(ctx.getActionContext("orderId").toString());
orderMapper.deleteById(orderId);
return true;
}
}
package cn.tedu.order.tcc;
import cn.tedu.order.entity.Order;
import cn.tedu.order.mapper.OrderMapper;
import io.seata.rm.tcc.api.BusinessActionContext;
import org.checkerframework.checker.units.qual.A;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.math.BigDecimal;
@Component
public class OrderTccActionImpl implements OrderTccAction{
@Autowired
private OrderMapper orderMapper;
@Transactional
@Override
public boolean prepare(BusinessActionContext ctx, Long id, Long userId, Long productId,Integer count, BigDecimal money) {
orderMapper.createFrozen(new Order(id,userId,productId,count,money,0));
//true表示成功
//false表示失败
return true;
}
@Transactional
@Override
public boolean commit(BusinessActionContext ctx) {
//
Long orderId = Long.valueOf(ctx.getActionContext("orderId").toString());
orderMapper.updateStatus(orderId, 1);
return true;
}
@Transactional
@Override
public boolean rollback(BusinessActionContext ctx) {
Long orderId = Long.valueOf(ctx.getActionContext("orderId").toString());
orderMapper.deleteById(orderId);
return true;
}
}
业务代码中不再直接保存订单数据,而是调用 TCC 第一阶段方法prepareCreateOrder(),并添加全局事务注解 @GlobalTransactional:
package cn.tedu.order.service;
import cn.tedu.order.entity.Order;
import cn.tedu.order.fegin.AccountClient;
import cn.tedu.order.fegin.EasyIdClient;
import cn.tedu.order.fegin.StorageClient;
import cn.tedu.order.mapper.OrderMapper;
import cn.tedu.order.tcc.OrderTccActionImpl;
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Random;
@Service
public class OrderServiceImpl implements OrderService{
@Autowired
private OrderMapper orderMapper;
@Autowired
private EasyIdClient easyIdClient;
@Autowired
private AccountClient accountClient;
@Autowired
private StorageClient storageClient;
@Autowired
private OrderTccActionImpl tcc;
@GlobalTransactional //启动全局事务
@Override
public void create(Order order) {
//远程调用发号器,生成订单id
String s = easyIdClient.nextId("order_business");
Long id = Long.valueOf(s);
order.setId(id);
/*//先临时随机产生id,加了发号器后,这行代码删除 Long id = Math.abs(new Random().nextLong());*/
/* tcc是一个动态代理对象,不是原始的对象 用AOP切入了代码,会拦截调用,新建上下文对象并传入目标方法 */
//冻结订单
tcc.prepare(
null,
order.getId(),
order.getUserId(),
order.getProductId(),
order.getCount(),
order.getMoney()
);
//orderMapper.create(order);//创建正常订单
// TODO:远程调用库存,减少库存
storageClient.decrease(order.getProductId(),order.getCount());
// TODO:远程调用账号,扣减账户
//accountClient.decrease(order.getUserId(),order.getMoney());
}
}
按顺序启动服务:
观察控制台日志:
查看数据库表中的订单数据:
在库存项目中执行减少库存:
我们要添加以下 TCC 事务操作的代码:
Confirm - 第二阶段,提交事务,使用冻结的库存完成业务数据处理:
Cancel - 第二阶段,回滚事务,冻结的库存解冻,恢复以前的库存量:
有三个文件需要配置:
spring:
application:
name: storage
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost/seata_storage?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8
username: root
password: root
cloud:
alibaba:
seata:
tx-service-group: order_tx_group
server:
port: 8082
eureka:
client:
service-url:
defaultZone: http://localhost:8761/eureka
instance:
prefer-ip-address: true
instance-id: ${spring.cloud.client.ip-address}:${spring.application.name}:${server.port}
mybatis-plus:
type-aliases-package: cn.tedu.storage.entity
mapper-locations:
- classpath:/mapper/*Mapper.xml
configuration:
map-underscore-to-camel-case: true
logging:
level:
cn.tedu.storage.mapper: DEBUG
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "eureka"
nacos {
serverAddr = "localhost"
namespace = ""
cluster = "default"
}
eureka {
# 连接eureka,要从注册表发现 seata-server
serviceUrl = "http://localhost:8761/eureka"
# application = "default"
# weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = "0"
password = ""
cluster = "default"
timeout = "0"
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
username = ""
password = ""
}
consul {
cluster = "default"
serverAddr = "127.0.0.1:8500"
}
etcd3 {
cluster = "default"
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
application = "default"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
cluster = "default"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3、springCloudConfig
type = "file"
nacos {
serverAddr = "localhost"
namespace = ""
group = "SEATA_GROUP"
}
consul {
serverAddr = "127.0.0.1:8500"
}
apollo {
app.id = "seata-server"
apollo.meta = "http://192.168.1.204:8801"
namespace = "application"
}
zk {
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
username = ""
password = ""
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
name = "file.conf"
}
}
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
# the client batch send request enable
enableClientBatchSendRequest = true
#thread factory for netty
threadFactory {
bossThreadPrefix = "NettyBoss"
workerThreadPrefix = "NettyServerNIOWorker"
serverExecutorThread-prefix = "NettyServerBizHandler"
shareBossWorker = false
clientSelectorThreadPrefix = "NettyClientSelector"
clientSelectorThreadSize = 1
clientWorkerThreadPrefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
bossThreadSize = 1
#auto default pin or 8
workerThreadSize = "default"
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
#transaction service group mapping
# order_tx_group 与 yml 中的 “tx-service-group: order_tx_group” 配置一致
# “seata-server” 与 TC 服务器的注册名一致
# 从eureka获取seata-server的地址,再向seata-server注册自己,设置group
# order_tx_group 事务组,对应使用哪个协调器
# seata-server 是注册表中的服务id
vgroupMapping.order_tx_group = "seata-server"
#only support when registry.type=file, please don't set multiple addresses
order_tx_group.grouplist = "127.0.0.1:8091"
#degrade, current not support
enableDegrade = false
#disable seata
disableGlobalTransaction = false
}
client {
rm {
asyncCommitBufferLimit = 10000
lock {
retryInterval = 10
retryTimes = 30
retryPolicyBranchRollbackOnConflict = true
}
reportRetryCount = 5
tableMetaCheckEnable = false
reportSuccessEnable = false
}
tm {
commitRetryCount = 5
rollbackRetryCount = 5
}
undo {
dataValidation = true
logSerialization = "jackson"
logTable = "undo_log"
}
log {
exceptionRate = 100
}
}
这三个文件的设置与上面 order 项目的配置完全相同,请参考上面订单配置一章进行配置。
根据前面的分析,库存数据操作有以下三项:
在 StorageMapper 中添加三个方法:
package cn.tedu.storage.mapper;
import cn.tedu.storage.entity.Storage;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
public interface StorageMapper extends BaseMapper<Storage> {
//减少库存
void decrease(Long productId,Integer count);
//查询库存,用来判断有没有足够的库存
Storage findByProductId(Long productId);
//冻结库存
void updateResidueToFrozen(Long productId,Integer count);
//冻结 --> 已售出
void updateFrozenToUsed(Long productId,Integer count);
//冻结 --> 可用
void updateFrozenToResidue(Long productId,Integer count);
}
对应的 StorageMapper.xml 中也要添加 sql:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="cn.tedu.storage.mapper.StorageMapper" >
<resultMap id="BaseResultMap" type="cn.tedu.storage.entity.Storage" >
<id column="id" property="id" jdbcType="BIGINT" />
<result column="product_id" property="productId" jdbcType="BIGINT" />
<result column="total" property="total" jdbcType="INTEGER" />
<result column="used" property="used" jdbcType="INTEGER" />
<result column="residue" property="residue" jdbcType="INTEGER" />
</resultMap>
<update id="decrease">
UPDATE storage
SET used = used + #{count},
residue = residue - #{count}
WHERE product_id = #{productId}
</update>
<!--查询库存-->
<select id="findByProductId" resultMap="BaseResultMap">
select * from storage where product_id = #{productId}
</select>
<!--冻结库存-->
<update id="updateResidueToFrozen">
update storage
set Residue=Residue-#{count},Frozen=Frozen+#{count}
where product_id = #{productId}
</update>
<!--冻结 –> 已售出-->
<update id="updateFrozenToUsed">
update storage
set Frozen=Frozen-#{count},Used=Used+#{count}
where product_id = #{productId}
</update>
<!--冻结 –> 可用-->
<update id="updateFrozenToResidue">
update storage
set Frozen=Frozen-#{count},Residue=Residue-#{count}
where product_id = #{productId}
</update>
</mapper>
Try - prepareDecreaseStorage()
Confirm - commit()
Cancel - rollback()
package cn.tedu.storage.tcc;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
/* 1.@LocalTCC 2.添加三个方法,都添加BusssinessActionContext 参数 3.T方法添加@TwoPhaseBusinessAction 4.T方法添加业务数据参数 5.需要添加到上下文对象的参数,添加@BussinessActionContextParameter */
@LocalTCC
public interface StorageTccAction {
@TwoPhaseBusinessAction(name = "StorageTccAction")
boolean prepare(BusinessActionContext ctx,
@BusinessActionContextParameter(paramName = "productId") Long productId,
@BusinessActionContextParameter(paramName = "count") Integer count);
boolean commit(BusinessActionContext ctx);
boolean rollback(BusinessActionContext ctx);
}
实现类:
package cn.tedu.storage.tcc;
import cn.tedu.storage.entity.Storage;
import cn.tedu.storage.mapper.StorageMapper;
import io.seata.rm.tcc.api.BusinessActionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import rx.Producer;
@Component
public class StorageTccActionImpl implements StorageTccAction {
@Autowired
private StorageMapper storageMapper;
@Transactional
@Override
public boolean prepare(BusinessActionContext ctx, Long productId, Integer count) {
Storage storage = storageMapper.findByProductId(productId);
if (storage.getResidue() < count){
throw new RuntimeException("库存不足");
}
storageMapper.updateResidueToFrozen(productId, count);
return true;
}
@Transactional
@Override
public boolean commit(BusinessActionContext ctx) {
Long productId = Long.valueOf(ctx.getActionContext("productId").toString());
Integer count = Integer.valueOf(ctx.getActionContext("count").toString());
storageMapper.updateFrozenToUsed(productId, count);
if (Math.random()<0.8){
return false;
}
return true;
}
@Transactional
@Override
public boolean rollback(BusinessActionContext ctx) {
Long productId = Long.valueOf(ctx.getActionContext("productId").toString());
Integer count = Integer.valueOf(ctx.getActionContext("count").toString());
storageMapper.updateFrozenToResidue(productId, count);
return true;
}
}
业务代码中调用 TCC 第一阶段方法prepareDecreaseStorage(),并添加全局事务注解 @GlobalTransactional:
package cn.tedu.storage.service;
import cn.tedu.storage.mapper.StorageMapper;
import cn.tedu.storage.tcc.StorageTccAction;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class StorageServiceImpl implements StorageService{
@Autowired
private StorageTccAction tcc;
@Override
public void decrease(Long productId, Integer count) {
tcc.prepare(null, productId, count);
}
}
按顺序启动服务:
观察 storage 的控制台日志:
查看数据库表中的库存数据:
扣减金额 TCC 事务分析请见Seata TCC模式-TCC模式介绍
配置
有三个文件需要配置:
application.yml
registry.conf
file.conf
这三个文件的设置与上面 order 项目的配置完全相同,请参考上面订单配置一章进行配置。
根据前面的分析,库存数据操作有以下三项:
在 AccountMapper 中添加三个方法:
添加 TCC 接口,在接口中添加以下方法:
Try - prepareDecreaseAccount()
Confirm - commit()
Cancel - rollback()
实现类:
在业务代码中调用 Try 阶段方法
业务代码中调用 TCC 第一阶段方法prepareDecreaseAccount(),并添加全局事务注解 @GlobalTransactional:
启动 account 进行测试
按顺序启动服务:
Eureka
Seata Server
Easy Id Generator
Storage
Account
Order
调用保存订单,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100
观察 account 的控制台日志:
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_53244569/article/details/121012269
内容来源于网络,如有侵权,请联系作者删除!