Fescar 源码走读(1)之业务调用方

x33g5p2x  于2021-12-21 转载在 其他  
字(5.1k)|赞(0)|评价(0)|浏览(231)

先简单走一下主流程(仅做参考,如有错误后续更正)

先从GlobalTransactional看起,这个标签是全局事务发起的标志,它是通过GlobalTransactionScanner方法在初始化对象之前自动扫描GlobalTransactional标签,创建相应方法所在类的代理类,intecepter是GlobalTransactionalInterceptor

public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
    final GlobalTransactional anno = getAnnotation(methodInvocation.getMethod());
    if (anno != null) {
        try {
            return transactionalTemplate.execute(new TransactionalExecutor() {
                @Override
                public Object execute() throws Throwable {
                    return methodInvocation.proceed();
                }

                @Override
                public int timeout() {
                    return anno.timeoutMills();
                }

                @Override
                public String name() {
                    if (anno.name() != null) {
                        return anno.name();
                    }
                    return formatMethod(methodInvocation.getMethod());
                }
            });
        } catch (TransactionalExecutor.ExecutionException e) {
            TransactionalExecutor.Code code = e.getCode();
            switch (code) {
                case RollbackDone:
                    throw e.getOriginalException();
                case BeginFailure:
                    failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case CommitFailure:
                    failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case RollbackFailure:
                    failureHandler.onRollbackFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                default:
                    throw new ShouldNeverHappenException("Unknown TransactionalExecutor.Code: " + code);

            }
        }

    }
    return methodInvocation.proceed();
}

具体的事务调用模板在transactionalTemplate中: 

public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {

    // 1. get or create a transaction
    GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

    // 2. begin transaction
    try {
        tx.begin(business.timeout(), business.name());

    } catch (TransactionException txe) {
        throw new TransactionalExecutor.ExecutionException(tx, txe,
            TransactionalExecutor.Code.BeginFailure);

    }

    Object rs = null;
    try {

        // Do Your Business
        rs = business.execute();

    } catch (Throwable ex) {

        // 3. any business exception, rollback.
        try {
            tx.rollback();

            // 3.1 Successfully rolled back
            throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);

        } catch (TransactionException txe) {
            // 3.2 Failed to rollback
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.RollbackFailure, ex);

        }

    }

    // 4. everything is fine, commit.
    try {
        tx.commit();

    } catch (TransactionException txe) {
        // 4.1 Failed to commit
        throw new TransactionalExecutor.ExecutionException(tx, txe,
            TransactionalExecutor.Code.CommitFailure);

    }
    return rs;
}

这里的tx.begin, tx.commit等都是rpc请求,会通知tm方当前要发生的动作,有tm来管理事务的状态,然后再通知rm来具体执行相应的动作

具体看下tx.commit过程:

public void commit() throws TransactionException {
    check();
    RootContext.unbind();
    if (role == GlobalTransactionRole.Participant) {
        // Participant has no responsibility of committing
        return;
    }
    status = transactionManager.commit(xid);

}

会移除保存在threadlocal里的全局事务id, 对于分支事务不需要告诉tm事务动作,只有事务发起者才会通知tm.

DefaultTransactionManager里的commit方法:

public GlobalStatus commit(String xid) throws TransactionException {
    long txId = XID.getTransactionId(xid);
    GlobalCommitRequest globalCommit = new GlobalCommitRequest();
    globalCommit.setTransactionId(txId);
    GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
    return response.getGlobalStatus();
}

begin,commit,rollback方法都会调用syncCall而且request都是一样的参数

目前只支持AT模式

GlobalTransactionScanner

private void initClient() {
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Initializing Global Transaction Clients ... ");
    }
    if (StringUtils.isEmpty(applicationId) || StringUtils.isEmpty(txServiceGroup)) {
        throw new IllegalArgumentException(
            "applicationId: " + applicationId + ", txServiceGroup: " + txServiceGroup);
    }
    TMClient.init(applicationId, txServiceGroup);
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info(
            "Transaction Manager Client is initialized. applicationId[" + applicationId + "] txServiceGroup["
                + txServiceGroup + "]");
    }
    if ((AT_MODE & mode) > 0) {
        RMClientAT.init(applicationId, txServiceGroup);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info(
                "Resource Manager for AT Client is initialized. applicationId[" + applicationId
                    + "] txServiceGroup["
                    + txServiceGroup + "]");
        }
    }
    if ((MT_MODE & mode) > 0) {
        throw new NotSupportYetException();
    }
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Global Transaction Clients are initialized. ");
    }
}

dubbo通过TransactionPropagationFilter传输xid给分支服务: 

public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        String xid = RootContext.getXID();
        String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("xid in RootContext[" + xid + "] xid in RpcContext[" + rpcXid + "]");
        }
        boolean bind = false;
        if (xid != null) {
//消费方
            RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
        } else {
//服务提供者方
            if (rpcXid != null) {
                RootContext.bind(rpcXid);
                bind = true;
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("bind[" + rpcXid + "] to RootContext");
                }
            }
        }
        try {
            return invoker.invoke(invocation);

        } finally {
            if (bind) {
                String unbindXid = RootContext.unbind();
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("unbind[" + unbindXid + "] from RootContext");
                }
                if (!rpcXid.equalsIgnoreCase(unbindXid)) {
                    LOGGER.warn("xid in change during RPC from " + rpcXid + " to " + unbindXid);
                    if (unbindXid != null) {
                        RootContext.bind(unbindXid);
                        LOGGER.warn("bind [" + unbindXid + "] back to RootContext");
                    }
                }
            }
        }
    }

相关文章