AsyncWorker目前只实现了branchCommit,用于在分支事务提交后异步删除undo sql记录,目前branchrollback接口还没有实现
private void doBranchCommits() {
if (ASYNC_COMMIT_BUFFER.size() == 0) {
return;
}
Map<String, List<Phase2Conjava>> mappedConjavas = new HashMap<>();
Iterator<Phase2Conjava> iterator = ASYNC_COMMIT_BUFFER.iterator();
while (iterator.hasNext()) {
Phase2Conjava commitConjava = iterator.next();
List<Phase2Conjava> conjavasGroupedByResourceId = mappedConjavas.get(commitConjava.resourceId);
if (conjavasGroupedByResourceId == null) {
conjavasGroupedByResourceId = new ArrayList<>();
mappedConjavas.put(commitConjava.resourceId, conjavasGroupedByResourceId);
}
conjavasGroupedByResourceId.add(commitConjava);
iterator.remove();
}
for (String resourceId : mappedConjavas.keySet()) {
Connection conn = null;
try {
try {
DataSourceProxy dataSourceProxy = DataSourceManager.get().get(resourceId);
conn = dataSourceProxy.getPlainConnection();
} catch (SQLException sqle) {
LOGGER.warn("Failed to get connection for async committing on " + resourceId, sqle);
continue;
}
List<Phase2Conjava> conjavasGroupedByResourceId = mappedConjavas.get(resourceId);
for (Phase2Conjava commitConjava : conjavasGroupedByResourceId) {
try {
UndoLogManager.deleteUndoLog(commitConjava.xid, commitConjava.branchId, conn);
} catch (Exception ex) {
LOGGER.warn("Failed to delete undo log [" + commitConjava.branchId + "/" + commitConjava.xid + "]", ex);
}
}
} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException closeEx) {
LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", closeEx);
}
}
}
}
}
ConnectionProxy.commit在提交是会调用branchRegiester。在如下方法register()里
public void commit() throws SQLException {
if (conjava.inGlobalTransaction()) {
try {
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e);
}
try {
if (conjava.hasUndoLog()) {
UndoLogManager.flushUndoLogs(this);
}
targetConnection.commit();
} catch (Throwable ex) {
report(false);
if (ex instanceof SQLException) {
throw (SQLException) ex;
} else {
throw new SQLException(ex);
}
}
report(true);
conjava.reset();
} else {
targetConnection.commit();
}
}
DataSourceManager是rm的实现:
branchRegister通过rpcclient到服务端注册分支事务
DefaultCoordinator来注册分支事务
@Override
protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response, RpcConjava rpcConjava) throws TransactionException {
response.setTransactionId(request.getTransactionId());
response.setBranchId(core.branchRegister(request.getBranchType(), request.getResourceId(), rpcConjava.getClientId(),
XID.generateXID(request.getTransactionId()), request.getLockKey()));
}
DefaultCore.branchRegister
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String lockKeys) throws TransactionException {
GlobalSession globalSession = assertGlobalSession(XID.getTransactionId(xid), GlobalStatus.Begin);
BranchSession branchSession = new BranchSession();
branchSession.setTransactionId(XID.getTransactionId(xid));
branchSession.setBranchId(UUIDGenerator.generateUUID());
branchSession.setApplicationId(globalSession.getApplicationId());
branchSession.setTxServiceGroup(globalSession.getTransactionServiceGroup());
branchSession.setBranchType(branchType);
branchSession.setResourceId(resourceId);
branchSession.setLockKey(lockKeys);
branchSession.setClientId(clientId);
if (!branchSession.lock()) {
throw new TransactionException(LockKeyConflict);
}
try {
globalSession.addBranch(branchSession);
} catch (RuntimeException ex) {
throw new TransactionException(FailedToAddBranch);
}
return branchSession.getBranchId();
}
分支branch注册到全局事务分支上
commit过程:
第一阶段提交以executeUpdate为例,入口地址如下:
public int executeUpdate() throws SQLException {
return ExecuteTemplate.execute(this, new StatementCallback<Integer, PreparedStatement>() {
@Override
public Integer execute(PreparedStatement statement, Object... args) throws SQLException {
return statement.executeUpdate();
}
});
}
ExecuteTemplate.execute根据不同的语句生成不同的执行器
public static <T, S extends Statement> T execute(SQLRecognizer sqlRecognizer,
StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback,
Object... args) throws SQLException {
if (!RootConjava.inGlobalTransaction()) {
// Just work as original statement
return statementCallback.execute(statementProxy.getTargetStatement(), args);
}
if (sqlRecognizer == null) {
sqlRecognizer = SQLVisitorFactory.get(
statementProxy.getTargetSQL(),
statementProxy.getConnectionProxy().getDbType());
}
Executor<T> executor = null;
switch (sqlRecognizer.getSQLType()) {
case INSERT:
executor = new InsertExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
case UPDATE:
executor = new UpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
case DELETE:
executor = new DeleteExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
case SELECT_FOR_UPDATE:
executor = new SelectForUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
break;
default:
executor = new PlainExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
}
T rs = null;
try {
rs = executor.execute(args);
} catch (Throwable ex) {
if (ex instanceof SQLException) {
throw (SQLException) ex;
} else {
// Turn everything into SQLException
new SQLException(ex);
}
}
return rs;
}
以UpdateExecutor为例,UpdateExecutor.execute通过模板类会调用如下方法:
public T doExecute(Object... args) throws Throwable {
AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
if (connectionProxy.getAutoCommit()) {
return executeAutoCommitTrue(args);
} else {
return executeAutoCommitFalse(args);
}
}
具体实现会保存更改前后的数据镜像并插入到undo log里并commit。只有回滚用undo log里的数据生成sql语句回滚
protected T executeAutoCommitFalse(Object[] args) throws Throwable {
TableRecords beforeImage = beforeImage();
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
TableRecords afterImage = afterImage(beforeImage);
statementProxy.getConnectionProxy().prepareUndoLog(sqlRecognizer.getSQLType(), sqlRecognizer.getTableName(), beforeImage, afterImage);
return result;
}
回滚在上一篇已经说了,就不多说了
由于AT事务在第一阶段已提交,所以commit过程是由Asyncworker异步删除undo log,真正的commit是在第一阶段完成的
public BranchStatus branchCommit(String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
return asyncWorker.branchCommit(xid, branchId, resourceId, applicationData);
}
TableMetaCache:
这个类里存储了缓存的TableMeta(一个小问题,如果tablemeta发生变更如果应用服务没有重启的话,默认15分钟缓存才过期。这种情况应该很少,正常情况下如果tablemeta有变更的话,相应的业务应用应该都有代码变更需要重新部署,或者考虑数据库更改对应的业务兼容性的问题)
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_42073629/article/details/106769844
内容来源于网络,如有侵权,请联系作者删除!