Spring事务扩展篇

x33g5p2x  于2022-07-04 转载在 Spring  
字(12.8k)|赞(0)|评价(0)|浏览(428)

本系列文章:

Spring事务管理—下

Spring事务管理—中

Spring事务管理—上

Spring事务王国概览

之前我们已经深度剖析了Spring局部事务的实现,但是懂了实现还不够,我们需要从Spring的实现中学习到一些好用的设计思想,本篇文章就是来总结一下Spring事务设计的精华思想。

活用ThreadLocal

之前讲过,Spring通过ThreadLocal来避免在业务代码中进行Connetion传递的尴尬局面。但是ThreadLocal作用不只于此,或者说Spring团队使用ThreadLocal保存Connection,不只只是为了完成connection-passing,还有其他考量,那么是什么呢?

想要搞清楚,前提是理解ThreadLocal具体的作用有哪些:

  • 保障线程安全性

看上面这幅图,我们知道Connection是一个具有状态的对象,即它是非线程安全的对象,在这种多线程并发访问情况下必定存在问题,因此对于某些有状态的或者非线程安全的对象,我们可以在多线程程序中为每个对象分配相应的副本,而不是让多线程共享该类型的某个对象。

所以为了保证多个线程使用Connection进行数据访问过程中的数据安全,我们通过ThreadLocal为每个线程分配了一个他们各自持有的Connection,从而避免了对单一Connection资源的争用。
在JDBC中是一个Connection对应一个事务,如果所有的线程共用一个Connection的话,事务管理就失控了。

  • 实现当前程序执行流程内的数据传递

这种场景下,我们更多关注的是数据在单一线程环境下的数据流动过程。

例如: 我们可以通过ThreadLocal来跟踪保存在线程内的日志序列,在程序执行的任务必要执行点将系统跟踪信息添加到ThreadLocal,在何时的时间点拿出来分析。

我们还可以通过ThreadLocal保存某种全局变量,在当前线程执行到某个时间点时取出进行设置,在另外一个时间点取出进行判断。

下面给出一个具体使用场景:

采用ThreadLocal在单一线程内传递数据,可以避免耦合性很强的方法参数形式的数据传递方式。其实有点像数据在随着当前线程执行过程不断流动一样。

ThreadLocal实现原理

虽然我们将资源设置到了ThreadLocal上,但是ThreadLocal本身并不会保存这些特定的数据资源,这些资源都是绑定到了特定的线程上面。

每个Thread类都有一个map集合用来保存绑定到当前线程上的资源:

//默认为空,未进行初始化
    ThreadLocal.ThreadLocalMap threadLocals = null;

当我们通过ThreadLocal的set方法来设置数据的时候:

public void set(T value) {
       //取出当前线程
        Thread t = Thread.currentThread();
        //获取当前线程的threadLocals变量
        ThreadLocalMap map = getMap(t);
        //如果已经初始化了,那么直接往map里面设置值
        //key就是当前ThreadLocal,value就是我们要绑定的资源
        if (map != null)
            map.set(this, value);
        else
        //否则就初始化map,并且将value放入map集合中去
            createMap(t, value);
    }
    
    ThreadLocalMap getMap(Thread t) {
        //返回的是当前线程的threadLocals属性
        return t.threadLocals;
    }

    void createMap(Thread t, T firstValue) {
    //初始map,key还是当前threadLocal,value就是上面传入的资源
        t.threadLocals = new ThreadLocalMap(this, firstValue);
    }

当我们通过get方法从ThreadLocal获取值时:

public T get() {
        //获取当前线程
        Thread t = Thread.currentThread();
        //获取当前线程的threadLocals属性
        ThreadLocalMap map = getMap(t);
        //如果已经初始化过了
        if (map != null) {
        //如果能够取出当前ThreadLocal在map中关联的资源,那么直接返回
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        //设置并返回一个初始值
        return setInitialValue();
    }
     
     private T setInitialValue() {
        //初始化来自与initialValue()方法的返回值
        T value = initialValue();
        //如果map还没有初始化就初始化,然后将初始值设置进去
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
        return value;
    }
    
    //初始值默认为null,一般都建议重写该方法
    protected T initialValue() {
        return null;
    }

看完了ThreadLocal的实现原理,那么上面最开始给出的Conntion案例图应该改成下面这样:

使用ThreadLocal管理多数据源切换条件

前面,我们介绍过了如何通过AbstractRoutingDataSource 实现一个简单的多数据源切换

Spring中的DataSource

但是在多数据源切换过程中,切换的条件可能随着应用程序的需求和各异,而且,通常不会像我们的AbstractRoutingDataSource 原型那样简单,只需要内部条件就可以实现数据源切换的判断。

更多时候,需要外部条件的介入,这就会有一个问题,如何为AbstractRoutingDataSource 的实现子类传入这些外部条件相关的数据? ---->没错,就是ThreadLocal.

  • AbstractRoutingDataSource获取数据库连接的方法如下,determineTargetDataSource方法是实现动态切换的关键
@Override
	public Connection getConnection(String username, String password) throws SQLException {
		return determineTargetDataSource().getConnection(username, password);
	}
  • determineTargetDataSource方法决定采用哪个dataSource
protected DataSource determineTargetDataSource() {
		Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
		//determineCurrentLookupKey是我们需要重写的方法
		Object lookupKey = determineCurrentLookupKey();
		DataSource dataSource = this.resolvedDataSources.get(lookupKey);
		if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
		//resolvedDefaultDataSource是兜底数据源
			dataSource = this.resolvedDefaultDataSource;
		}
		if (dataSource == null) {
			throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
		}
		return dataSource;
	}
  • AbstractRoutingDataSource中我们需要准备两个属性,一个是目标dataSources集合,还有一个兜底的数据源
@Nullable
	private Map<Object, Object> targetDataSources;

	@Nullable
	private Object defaultTargetDataSource;

之所以我们上面看到的都是下面两个属性,其实他们都是一样的,只不过在afterPropertiesSet方法中会进行校验,然后提前解析一波,把上面两个属性,变成解析后的集合 (如果还不懂的话,可以自己看一下该类源码实现,核心思路已经讲了,其余的代码就比较简单了)

@Nullable
	private Map<Object, DataSource> resolvedDataSources;

	@Nullable
	private DataSource resolvedDefaultDataSource;

有了上面的回顾之后,下面我们想一想,如何通过ThreadLocal完成外部条件的介入,从而通过外部条件来实现数据源的动态切换呢?

我们可以通过ThreadLocal保存每个数据源对应的标志,然后在determineCurrentLookupKey方法中通过获取ThreadLocal中保存的标志,决定切换到哪一个数据源。

这里简单举个例子:

  • 给出一个负责保存ThreadLocal状态的工具类
public class DataSourceUtil {
    /**
     * helper数据库
     */
    public static final Integer HELPER=1;
    /**
     * training数据库
     */
    public static final Integer TRAINING=2;
    /**
     * 存放数据源标志--默认返回helper数据源
     */
    private static final ThreadLocal<Integer> DATASOURCE=ThreadLocal.withInitial(()->HELPER);

    static public Integer get(){
        return DATASOURCE.get();
    }

    static public void set(Integer val){
        DATASOURCE.set(val);
    }
}
  • 定义一个自己的实现类,重写determineCurrentLookupKey方法
@Data
public class DynamicDataSource extends AbstractRoutingDataSource {

    public DynamicDataSource() {
        //设置默认的数据源
        super.setDefaultTargetDataSource(DataSourceUtil.get());
        HashMap<Object, Object> dataSources = new HashMap<>();
        dataSources.put(DataSourceUtil.HELPER,DataSourceTestHandler.getHelperDataSource());
        dataSources.put(DataSourceUtil.TRAINING,DataSourceTestHandler.getTrainingDataSource());
        //设置目标数据源集合
        super.setTargetDataSources(dataSources);
    }

    @Override
    protected Object determineCurrentLookupKey() {
        return DataSourceUtil.get();
    }
}

剩下的就是将DynamicDataSource 注入容器即可,然后我们通过DataSourceUtil.set()方法就可以根据外部条件,完成数据源的动态切换了,这里就不演示了

活用Strategy模式

策略模式本意是封装一系列可以互相替换的算法逻辑,可以选择让用户自己选择使用哪一种算法,也可以像用户屏蔽算法的多样性,程序自动判定选择最适合当前条件下的算法。
不理解策略模式的可以先看一下下面这篇文章:

设计模式—策略模式

这里用支付做一个简单的例子: 用户在支付的时候通常有多种支付方式的选择,常见的有微信支付和支付宝支付,如果要写支付过程的代码,我猜有人会这样写:

public class PayService {
    private static final Integer ZHI_FU_BAO_PAY=1;
    private static final Integer WEI_XIN_PAY=2;
    public Boolean pay(Integer money,Integer type){
          if(type.equals(ZHI_FU_BAO_PAY)){
              //...
              return true;
          }else if(type.equals(WEI_XIN_PAY)){
              //...
              return true;
          }else {
              throw new UnsupportedOperationException("不支持的支付方式");
          }
    }
}

上面这种写法适合当前场景吗?

显然,不适合,大家可以思考,如果此时再添加进来银行卡支付,或者其他支付方式,那么每添加一次就需要增加一个分支。

并且pay方法主要精力应该放在支付上,而不是挑选支付方式上面,因此这里需要采用策略模式来优化一下:

来看一下具体应该怎么写吧:

public interface PayWay {
    boolean pay(Integer money);
}

public class BankCardPayImpl implements PayWay{
    @Override
    public boolean pay(Integer money) {
        System.out.println("使用银行卡支付: "+money);
        return true;
    }
}

public class WeiXinPayImpl implements PayWay{
    @Override
    public boolean pay(Integer money) {
        System.out.println("使用微信支付: "+money);
        return true;
    }
}

public class ZhiFuBaoPayImpl implements PayWay{
    @Override
    public boolean pay(Integer money) {
        System.out.println("使用支付宝支付: "+money);
        return true;
    }
}

准备好了相关策略实例之后,下面大家需要思考如何完成算法挑选的过程,即我们需要知道算法和标记的映射关系。

这里有两个思路:

  • 提前准备好集合存放相关算法和每个标记的映射关系
  • 通过多重条件语句,选择出合适的算法

先来看一下用集合保存映射关系的方式:

public class PayContext {
    private PayWay payWay;

    public static final Integer ZHI_FU_BAO_PAY=1;
    public static final Integer WEI_XIN_PAY=2;
    public static final Integer BANK_CARD_PAY=3;

    private static final Map<Integer,PayWay> payWayMap=new HashMap<>();

    static {
        payWayMap.put(ZHI_FU_BAO_PAY,new ZhiFuBaoPayImpl());
        payWayMap.put(WEI_XIN_PAY,new WeiXinPayImpl());
        payWayMap.put(BANK_CARD_PAY,new BankCardPayImpl());
    }

    public boolean doPay(Integer money,Integer type){
        payWay=payWayMap.get(type);
        if(payWay==null){
            throw new UnsupportedOperationException("不支持的支付类型");
        }
        return payWay.pay(money);
    }
}
public class PayService {
    PayContext payContext=new PayContext();
    public Boolean pay(Integer money,Integer type){
          return payContext.doPay(money,type);
    }
}

如果采用多重条件语句的话:

public class PayContext {
    private PayWay payWay;

    public static final Integer ZHI_FU_BAO_PAY=1;
    public static final Integer WEI_XIN_PAY=2;
    public static final Integer BANK_CARD_PAY=3;
    

    public boolean doPay(Integer money,Integer type){
        if(type.equals(ZHI_FU_BAO_PAY)){
            System.out.println("支付宝支付: "+money);
        }else if(type.equals(WEI_XIN_PAY)){
            System.out.println("微信支付: "+money);
        }else if(type.equals(BANK_CARD_PAY)){
            System.out.println("银行卡支付: "+money);
        }else {
            throw new UnsupportedOperationException("不支持的支付类型");
        }
        return payWay.pay(money);
    }
}

显然这里更适合与第一种集合存放映射关系的方式,因为付款的选择可能会动态增加或者减少,而对于多重循环来说,它更适合与选择较为固定并且选择数量比较少的情况,所以要实事求是,没有哪一种写法更好,只有哪一种写法更符合当前需求

理解Strategy模式

虽然策略模式定义上强调是对算法的封装,但是我们不应该被算法所禁锢,实际上,只要能够有效地剥离客户端代码与特定关注点之间的依赖关系,策略模式就应该被考虑。

Spring事务就是一个非常好的例子,通过TransactionManager来对事务管理接口进行统一抽象化,客户端透明的使用PlatformTransactionManager这一策略接口进行事务界定,及时具体的事务策略需要变更,对客户端代码来说影响也会很小。

Spring还在很多地方用到了策略模式:

  • bean实例化过程中,会根据情况决定使用反射还是cglib,InstantiationStrategy是容器使用的实例化策略的抽象接口,Spring默认提供了SimpleInstantiationStrategy和CglibSubclassingInstantiationStrategy两个具体实现类。
  • Spring的Validation框架中,Validator定义也是一个策略接口,具体实现类根据具体场景提供不同的验证逻辑。

还包括日志框架,日志门面就可以看做是一个抽象策略接口,而日志框架的具体实现就可以看做是不同的日志框架选择策略。

总结: 当针对同一件事情有多种选择的时候,我们都可以考虑用策略模式来统一抽象接口,为客户端代码造福。当然,也不要死记硬背策略模式,灵活使用,包括灵活转换,才是王道。

策略模式中的策略又分为单一策略和动态策略(这是我自己进行的分类)

  • 单一策略: 整个客户端程序运行过程中只会依赖于一种单一的策略,例如: Spring提供的事务,使用PlatformTransactionManager进行事务界定的客户端代码,在其整个生命周期内只依赖于一个PlatformTransactionManager实现类,这样的情况很好处理,直接为客户端代码注入所需要的策略实现类即可。
  • 动态策略: 客户端生命周期内可能会动态依赖多个策略。比如上面的支付场景,客户端可能会不断的变换策略,那么这种情况下我们是需要提前准备好相关策略的映射关系,一般在程序运行过程中动态切换。

分布式事务

对分布式事务不了解的,建议先阅读下面这篇文章:

Mysql分布式事务

Spring的JtaTransactionManager使用到了JDNI技术,不了解的可以去看看:

JNDI是什么,怎么理解

spring事务管理器的顶级抽象是PlatformTransactionManager接口,其提供了个重要的实现类:

  • DataSourceTransactionManager:用于实现本地事务
  • JTATransactionManager:用于实现分布式事务

JTATransactionManager是下面的重点,JTATransactionManager只是对各种具体的JTA实现产品提供的分布式事务功能进行封装,JTATransactionManager会将最终的工作委派给具体的JTA实现来做。因此,最终我们还是要追到JTA规范以及JTA实现中。

首先,具体的事务资源,RDBMS,MessageQueue等,想要加入JTA管理的分布式事务,JTA规范要求其实现javax.transaction.xa.XAResource接口,所以,希望加入JTA管理的分布式事务的RM通常会提供相应的适配器,用于提供基于XAResource的分布式事务交互能力,比如关系数据库提供的支持XA的JDBC驱动程序。

当需要参与分布式事务的RM都拥有了XAResource支持后,JTATransactionManager与RM之间就可以使用二阶段提交协议进行通信。

但是具体的JTATransactionManager与RM之间的联系要由ApplicationServer进行协调,ApplicationServer是应用程序运行的容器,实现了JTA规范。

JTATransactionManager与各个RM之间的交互,整个过程如下:

通信过程具体如下:

  • ApplicationServer一开始通过JNDI绑定它的JTA实现中的UserTransaction或者TransactionManager具体实现类,这样,客户端应用程序就可以通过JNDI获取他们。

  • ApplicationServer内部要求TransactionManager为当前事务分配一个唯一的标志(XID),然后开启事务,并且会将当前事务绑定到当前线程。

  • AppServer会通过跟RM的适配器获取一个事务资源对象,我们暂且称之为TransactionResource,该资源对象包含两部分,一部分是JtaTransactionManager需要的XAResource,另一部分是程序需要使用的Connection资源,取得TransactionResource之后,AppServer需要做下面两件事情:

  • AppServer从 TransactionResource取出XAResource后,交给TransactionManager,TransactionManager通过该XAResource与RM进行关于二阶段提交协议的交互。 实际上,现在TransactionManager只是调用XAResource的start(xid)方法通知RM开始记录。

  • AppServer然后再把与XAResource属于同一个TransactionalResource的Connection传递给客户端应用程序使用,然后客户端应用程序就可以使用AppServer传给的Connection进行数据访问操作。

  • 数据访问操作完成后,关闭之前AppServer传给的Connection,AppServer在对应Conn被关闭后,会通知TransactionManager,由其调用对应的XAResource的end(xid)结束事务记录。

  • 当客户端通过UseTransaction或者TransactionManager的响应方法要求结束事务的时候,AppServer就会通知TransactionManager使用两阶段提交协议提交当前事务。

  • TransactionManager调用XAResource的prepare(xid)方法通知各个RM准备提交事务

  • 如果各个XAResource回答都是OK,TransactionManager调用XAResource的commit(xid)方法通知各个RM最终提交事务。

AppServer屏蔽了各个RM具体参与JtaTransactioNmanager所管理的分布式事务细节,客户端应用程序通过服务器的JNDI获取DataSource等资源,是因为只有使用AppServer公开的与XAResource绑定到同一TransactionResource的Connection,才能保证客户端应用程序所做的数据访问操作都能加入到AppServer协调的分布式事务中。

AppServer为客户端应用程序公开与当前分布式事务相关的Conn方式,就是实现一个DataSource,然后把该DataSource绑定到JNDI,这样,客户端应用程序就可以通过从JNDI中取得DataSource,获取与事务相关的Connection了。

不过,如果我们使用的JTA实现不是相应的AppServer提供的,比如,可以独立使用的Atomikos或者JOTM等JTA实现,要求我们从应用服务器的JNDI服务取得相应的DataSource这一前提是不成立的,这时,我们直接使用各个JTA产品提供的DataSource封装类进行数据访问即可,与AppServer屏蔽RM与TransactionManager之间的关系一样,这些产品也有与AppServer完成同样工作的角色,为我们关联具体的RM预当前产品的TransactionManager。

整体架构概览

对于Spring而言,其只提供了对分布式事务的顶层类封装,方便将其纳入原本的Spring的事务管理中,该类就是JtaTransactionManager

AbstractPlatformTransactionManager之前说过,该抽象类负责对PlatformTransactionManager中提供的接口进行实现,但是其中commit,getTransaction,rollback等部分,其中真正去执行这些操作的时候,会留给子类去实现相关更小范围的接口

  • JtaTransactionManager和之前讲的DataSourceTransactionManager一样,实现了AbstractPlatformTransactionManager留给子类的相关接口

获取一个事务对象:

@Override
	protected Object doGetTransaction() {
	//UserTransaction 就是不同的分布式事务实现库提供的
		UserTransaction ut = getUserTransaction();
		if (ut == null) {
			throw new CannotCreateTransactionException("No JTA UserTransaction available - " +
					"programmatic PlatformTransactionManager.getTransaction usage not supported");
		}
		//如果我们没有手动去设置,那么这里会通过JDNI去查询
		if (!this.cacheUserTransaction) {
			ut = lookupUserTransaction(
					this.userTransactionName != null ? this.userTransactionName : DEFAULT_USER_TRANSACTION_NAME);
		}
		//真正去获取一个事务
		return doGetJtaTransaction(ut);
	}
    
    //封装为一个JtaTransactionObject事务对象后返回
    protected JtaTransactionObject doGetJtaTransaction(UserTransaction ut) {
		return new JtaTransactionObject(ut);
	}

开启一个事务:

@Override
	protected void doBegin(Object transaction, TransactionDefinition definition) {
	//拿到传入的事务对象
		JtaTransactionObject txObject = (JtaTransactionObject) transaction;
		try {
		//开启分布式事务
			doJtaBegin(txObject, definition);
		}
		catch (NotSupportedException | UnsupportedOperationException ex) {
			throw new NestedTransactionNotSupportedException(
					"JTA implementation does not support nested transactions", ex);
		}
		catch (SystemException ex) {
			throw new CannotCreateTransactionException("JTA failure on begin", ex);
		}
	}

   	protected void doJtaBegin(JtaTransactionObject txObject, TransactionDefinition definition)
			throws NotSupportedException, SystemException {
        //设置隔离级别
		applyIsolationLevel(txObject, definition.getIsolationLevel());
		int timeout = determineTimeout(definition);
		//设置超时时间
		applyTimeout(txObject, timeout);
		//获取到相关分布式事务的UserTransaction,开启一个分布式事务
		txObject.getUserTransaction().begin();
	}

对于atomikos而言,其会调用UserTransactionImp中的TransactionManagerImp来真正开启一个分布式事务

JtaTransactionManager提供的其他接口也是类似操作,最终都是调用引入的分布式事务实现库中相关方法,完成最终分布式事务的操作,而Spring本身提供的JtaTransactionManager并不具备对分布式事务处理的能力。

相关文章