文章23 | 阅读 16218 | 点赞0
本文基于seata 1.3.0版本
本文接文章《Seata解析-TC处理全局事务和分支事务原理详解之全局事务开启和分支事务注册》继续介绍TC对请求的处理。本文将介绍分支状态报告请求和全局事务报告请求。
分支状态报告请求的消息类型是MessageType.TYPE_BRANCH_STATUS_REPORT,请求对象为BranchReportRequest,该请求由RM发起。该请求的作用是报告某个分支事务的状态,TC收到后更改对应BranchSession对象的状态。
处理该请求通过模板方法调用了DefaultCoordinator.doBranchReport:
protected void doBranchReport(BranchReportRequest request, BranchReportResponse response, RpcContext rpcContext)
throws TransactionException {
//调用DefaultCore的branchReport方法
core.branchReport(request.getBranchType(), request.getXid(), request.getBranchId(), request.getStatus(),
request.getApplicationData());
}
在doBranchReport方法中又调用了DefaultCore的branchReport方法,在branchReport方法中又调用了其他方法,最终调用到ATCore的branchReport方法:
public void branchReport(BranchType branchType, String xid, long branchId, BranchStatus status,
String applicationData) throws TransactionException {
//根据XID得到全局事务对象GlobalSession
GlobalSession globalSession = assertGlobalSessionNotNull(xid, true);
//根据branchId得到分支事务对象branchSession
BranchSession branchSession = globalSession.getBranch(branchId);
if (branchSession == null) {
throw new BranchTransactionException(BranchTransactionNotExist,
String.format("Could not found branch session xid = %s branchId = %s", xid, branchId));
}
//添加监听器
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
//更改分支事务状态,本请求的处理核心在下面这个方法中
globalSession.changeBranchStatus(branchSession, status);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Report branch status successfully, xid = {}, branchId = {}", globalSession.getXid(),
branchSession.getBranchId());
}
}
branchReport方法在最后调用了GlobalSession的changeBranchStatus方法,这个方法也非常简单,就是直接更改了分支事务的状态:
public void changeBranchStatus(BranchSession branchSession, BranchStatus status)
throws TransactionException {
//设置分支事务对象的状态
branchSession.setStatus(status);
//通知监听器,监听器的处理后续文章介绍
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onBranchStatusChange(this, branchSession, status);
}
}
分支状态报告请求的处理还是比较简单的,总起来说就是找到分支事务对象BranchSession,然后直接修改其状态。
全局事务报告请求的消息类型是MessageType.TYPE_GLOBAL_REPORT,请求对象为GlobalReportRequest,该请求由TM发起。该请求的作用是报告某个全局事务的状态,TC收到后更改该全局事务的状态。
全局事务报告请求的处理通过模板方法调用了协调器对象DefaultCoordinator的doGlobalReport方法,之后在doGlobalReport方法里面进一步调用到DefaultCore的globalReport方法:
public GlobalStatus globalReport(String xid, GlobalStatus globalStatus) throws TransactionException {
//根据XID找到全局事务
GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
if (globalSession == null) {
return globalStatus;
}
//添加监听器
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
doGlobalReport(globalSession, xid, globalStatus);
return globalSession.getStatus();
}
@Override
public void doGlobalReport(GlobalSession globalSession, String xid, GlobalStatus globalStatus) throws TransactionException {
if (globalSession.isSaga()) {
//getCore(BranchType.SAGA)方法返回的对象是SagaCore,现在使用的是AT模式,不知道为什么调用SagaCore对象
getCore(BranchType.SAGA).doGlobalReport(globalSession, xid, globalStatus);
}
}
globalReport和doGlobalReport两个方法也非常简单,就是找到全局事务对象,然后调用SagaCore的doGlobalReport方法:
public void doGlobalReport(GlobalSession globalSession, String xid, GlobalStatus globalStatus) throws TransactionException {
//全局事务提交
if (GlobalStatus.Committed.equals(globalStatus)) {
removeAllBranches(globalSession);
SessionHelper.endCommitted(globalSession);
LOGGER.info("Global[{}] committed", globalSession.getXid());
}
//全局事务回滚,Finished未知,等分析TM的时候在介绍该状态
else if (GlobalStatus.Rollbacked.equals(globalStatus)
|| GlobalStatus.Finished.equals(globalStatus)) {
removeAllBranches(globalSession);
SessionHelper.endRollbacked(globalSession);
LOGGER.info("Global[{}] rollbacked", globalSession.getXid());
}
//其他状态
else {
globalSession.changeStatus(globalStatus);
LOGGER.info("Global[{}] reporting is successfully done. status[{}]", globalSession.getXid(), globalSession.getStatus());
if (GlobalStatus.RollbackRetrying.equals(globalStatus)
|| GlobalStatus.TimeoutRollbackRetrying.equals(globalStatus)
|| GlobalStatus.UnKnown.equals(globalStatus)) {
globalSession.queueToRetryRollback();
LOGGER.info("Global[{}] will retry rollback", globalSession.getXid());
} else if (GlobalStatus.CommitRetrying.equals(globalStatus)) {
globalSession.queueToRetryCommit();
LOGGER.info("Global[{}] will retry commit", globalSession.getXid());
}
}
}
可以看到上面分了三种情况处理全局事务状态,下面我们也分三种情况介绍处理逻辑。
提交全局事务调用了两个方法:removeAllBranches和SessionHelper.endCommitted。
removeAllBranches方法将GlobalSession对象中的分支事务集合清空,同时释放分支事务对数据库记录加的锁,释放锁其实就是将上一篇文章介绍的BucketLockMap对象中的数据库记录和加锁的事务id删除。SessionHelper.endCommitted将修改GlobalSession对象中的状态为已提交(GlobalStatus.Committed):
private void removeAllBranches(GlobalSession globalSession) throws TransactionException {
//得到所有的分支事务
ArrayList<BranchSession> branchSessions = globalSession.getSortedBranches();
for (BranchSession branchSession : branchSessions) {
globalSession.removeBranch(branchSession);
}
}
//下面是GlobalSession中的方法
public void removeBranch(BranchSession branchSession) throws TransactionException {
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onRemoveBranch(this, branchSession);
}
//释放分支事务加的锁
branchSession.unlock();
//删除分支事务
remove(branchSession);
}
public boolean remove(BranchSession branchSession) {
return branchSessions.remove(branchSession);
}
下面是SessionHelper.endCommitted方法:
public static void endCommitted(GlobalSession globalSession) throws TransactionException {
globalSession.changeStatus(GlobalStatus.Committed);
globalSession.end();
}
//下面是GlobalSession的方法
public void end() throws TransactionException {
// Clean locks first
clean();
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onEnd(this);
}
}
public void clean() throws TransactionException {
//下面的代码也是释放分支事务的锁,因为removeAllBranches方法已经释放过,所以下面的方法是空执行
LockerManagerFactory.getLockManager().releaseGlobalSessionLock(this);
}
回滚全局事务的首先也是执行removeAllBranches方法,所以第一步也是释放分支事务加的锁。下面我们重点看一下SessionHelper.endRollbacked:
public static void endRollbacked(GlobalSession globalSession) throws TransactionException {
GlobalStatus currentStatus = globalSession.getStatus();
//isTimeoutGlobalStatus检查当前状态是否是超时的状态
if (isTimeoutGlobalStatus(currentStatus)) {
globalSession.changeStatus(GlobalStatus.TimeoutRollbacked);
} else {
globalSession.changeStatus(GlobalStatus.Rollbacked);
}
globalSession.end();
}
public static boolean isTimeoutGlobalStatus(GlobalStatus status) {
return status == GlobalStatus.TimeoutRollbacked
|| status == GlobalStatus.TimeoutRollbackFailed
|| status == GlobalStatus.TimeoutRollbacking
|| status == GlobalStatus.TimeoutRollbackRetrying;
}
endRollbacked方法首先设置事务的状态,然后调用了globalSession.end方法,globalSession.end方法与提交全局事务中调用的是同一个,都是用于释放分支事务锁的。
endRollbacked方法在修改全局事务状态前,会比较当前事务的状态,如果当前事务状态是超时,则设置事务状态为超时回滚成功,如果不是则设置事务状态为回滚成功。
至于全局事务的状态的流转,则在分析完RM和TM之后在介绍。
如果请求中的事务状态不是前面两种,则根据请求直接修改GlobalSession的状态,之后是一个分支判断,下面重点看一下分支判断:
//事务回滚中、超时回滚重试中、未知状态
if (GlobalStatus.RollbackRetrying.equals(globalStatus)
|| GlobalStatus.TimeoutRollbackRetrying.equals(globalStatus)
|| GlobalStatus.UnKnown.equals(globalStatus)) {
globalSession.queueToRetryRollback();
LOGGER.info("Global[{}] will retry rollback", globalSession.getXid());
}
//提交重试
else if (GlobalStatus.CommitRetrying.equals(globalStatus)) {
globalSession.queueToRetryCommit();
LOGGER.info("Global[{}] will retry commit", globalSession.getXid());
}
分支判断中分别调用了queueToRetryRollback和queueToRetryCommit。
queueToRetryRollback:
public void queueToRetryRollback() throws TransactionException {
//添加监听器
this.addSessionLifecycleListener(SessionHolder.getRetryRollbackingSessionManager());
//seata提供了回滚重试管理器,下面这行代码将本全局事务对象添加到回滚重试管理器中
//回滚重试管理器后面文章介绍
SessionHolder.getRetryRollbackingSessionManager().addGlobalSession(this);
GlobalStatus currentStatus = this.getStatus();
//设置事务状态
if (SessionHelper.isTimeoutGlobalStatus(currentStatus)) {
this.changeStatus(GlobalStatus.TimeoutRollbackRetrying);
} else {
this.changeStatus(GlobalStatus.RollbackRetrying);
}
}
queueToRetryCommit:
public void queueToRetryCommit() throws TransactionException {
//设置监听器
this.addSessionLifecycleListener(SessionHolder.getRetryCommittingSessionManager());
//将本全局事务对象添加到重试提交管理器中
//重试提交管理器后面文章介绍
SessionHolder.getRetryCommittingSessionManager().addGlobalSession(this);
//设置事务状态
this.changeStatus(GlobalStatus.CommitRetrying);
}
从queueToRetryRollback和queueToRetryCommit两个方法中看出,当事务需要重试提交或者重试回滚时,都将全局事务对象加入到对应管理器中,由管理器进行后续处理。关于管理器,后面的文章详细分析。
总的来说,在处理全局事务报告请求时,分了如下几种情况:
(1)事务提交,更改事务状态为已提交,释放分支事务加的数据库记录锁;
(2)事务回滚,释放分支事务加的数据库记录锁,更改事务状态为回滚成功或者超时回滚成功;
(3)事务回滚中、超时回滚重试中、未知状态,更改事务状态,将GlobalSession对象添加到回滚重试管理器;
(4)提交重试,更改事务状态,将GlobalSession对象添加到重试提交管理器;
(5)其他状态,更改事务状态。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_38308374/article/details/108502156
内容来源于网络,如有侵权,请联系作者删除!