本系列文章:
之前我们已经深度剖析了Spring局部事务的实现,但是懂了实现还不够,我们需要从Spring的实现中学习到一些好用的设计思想,本篇文章就是来总结一下Spring事务设计的精华思想。
之前讲过,Spring通过ThreadLocal来避免在业务代码中进行Connetion传递的尴尬局面。但是ThreadLocal作用不只于此,或者说Spring团队使用ThreadLocal保存Connection,不只只是为了完成connection-passing,还有其他考量,那么是什么呢?
想要搞清楚,前提是理解ThreadLocal具体的作用有哪些:
看上面这幅图,我们知道Connection是一个具有状态的对象,即它是非线程安全的对象,在这种多线程并发访问情况下必定存在问题,因此对于某些有状态的或者非线程安全的对象,我们可以在多线程程序中为每个对象分配相应的副本,而不是让多线程共享该类型的某个对象。
所以为了保证多个线程使用Connection进行数据访问过程中的数据安全,我们通过ThreadLocal为每个线程分配了一个他们各自持有的Connection,从而避免了对单一Connection资源的争用。
在JDBC中是一个Connection对应一个事务,如果所有的线程共用一个Connection的话,事务管理就失控了。
这种场景下,我们更多关注的是数据在单一线程环境下的数据流动过程。
例如: 我们可以通过ThreadLocal来跟踪保存在线程内的日志序列,在程序执行的任务必要执行点将系统跟踪信息添加到ThreadLocal,在何时的时间点拿出来分析。
我们还可以通过ThreadLocal保存某种全局变量,在当前线程执行到某个时间点时取出进行设置,在另外一个时间点取出进行判断。
下面给出一个具体使用场景:
采用ThreadLocal在单一线程内传递数据,可以避免耦合性很强的方法参数形式的数据传递方式。其实有点像数据在随着当前线程执行过程不断流动一样。
虽然我们将资源设置到了ThreadLocal上,但是ThreadLocal本身并不会保存这些特定的数据资源,这些资源都是绑定到了特定的线程上面。
每个Thread类都有一个map集合用来保存绑定到当前线程上的资源:
//默认为空,未进行初始化
ThreadLocal.ThreadLocalMap threadLocals = null;
当我们通过ThreadLocal的set方法来设置数据的时候:
public void set(T value) {
//取出当前线程
Thread t = Thread.currentThread();
//获取当前线程的threadLocals变量
ThreadLocalMap map = getMap(t);
//如果已经初始化了,那么直接往map里面设置值
//key就是当前ThreadLocal,value就是我们要绑定的资源
if (map != null)
map.set(this, value);
else
//否则就初始化map,并且将value放入map集合中去
createMap(t, value);
}
ThreadLocalMap getMap(Thread t) {
//返回的是当前线程的threadLocals属性
return t.threadLocals;
}
void createMap(Thread t, T firstValue) {
//初始map,key还是当前threadLocal,value就是上面传入的资源
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
当我们通过get方法从ThreadLocal获取值时:
public T get() {
//获取当前线程
Thread t = Thread.currentThread();
//获取当前线程的threadLocals属性
ThreadLocalMap map = getMap(t);
//如果已经初始化过了
if (map != null) {
//如果能够取出当前ThreadLocal在map中关联的资源,那么直接返回
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
//设置并返回一个初始值
return setInitialValue();
}
private T setInitialValue() {
//初始化来自与initialValue()方法的返回值
T value = initialValue();
//如果map还没有初始化就初始化,然后将初始值设置进去
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
return value;
}
//初始值默认为null,一般都建议重写该方法
protected T initialValue() {
return null;
}
看完了ThreadLocal的实现原理,那么上面最开始给出的Conntion案例图应该改成下面这样:
前面,我们介绍过了如何通过AbstractRoutingDataSource 实现一个简单的多数据源切换
Spring中的DataSource
但是在多数据源切换过程中,切换的条件可能随着应用程序的需求和各异,而且,通常不会像我们的AbstractRoutingDataSource 原型那样简单,只需要内部条件就可以实现数据源切换的判断。
更多时候,需要外部条件的介入,这就会有一个问题,如何为AbstractRoutingDataSource 的实现子类传入这些外部条件相关的数据? ---->没错,就是ThreadLocal.
@Override
public Connection getConnection(String username, String password) throws SQLException {
return determineTargetDataSource().getConnection(username, password);
}
protected DataSource determineTargetDataSource() {
Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
//determineCurrentLookupKey是我们需要重写的方法
Object lookupKey = determineCurrentLookupKey();
DataSource dataSource = this.resolvedDataSources.get(lookupKey);
if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
//resolvedDefaultDataSource是兜底数据源
dataSource = this.resolvedDefaultDataSource;
}
if (dataSource == null) {
throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
}
return dataSource;
}
@Nullable
private Map<Object, Object> targetDataSources;
@Nullable
private Object defaultTargetDataSource;
之所以我们上面看到的都是下面两个属性,其实他们都是一样的,只不过在afterPropertiesSet方法中会进行校验,然后提前解析一波,把上面两个属性,变成解析后的集合 (如果还不懂的话,可以自己看一下该类源码实现,核心思路已经讲了,其余的代码就比较简单了)
@Nullable
private Map<Object, DataSource> resolvedDataSources;
@Nullable
private DataSource resolvedDefaultDataSource;
有了上面的回顾之后,下面我们想一想,如何通过ThreadLocal完成外部条件的介入,从而通过外部条件来实现数据源的动态切换呢?
我们可以通过ThreadLocal保存每个数据源对应的标志,然后在determineCurrentLookupKey方法中通过获取ThreadLocal中保存的标志,决定切换到哪一个数据源。
这里简单举个例子:
public class DataSourceUtil {
/**
* helper数据库
*/
public static final Integer HELPER=1;
/**
* training数据库
*/
public static final Integer TRAINING=2;
/**
* 存放数据源标志--默认返回helper数据源
*/
private static final ThreadLocal<Integer> DATASOURCE=ThreadLocal.withInitial(()->HELPER);
static public Integer get(){
return DATASOURCE.get();
}
static public void set(Integer val){
DATASOURCE.set(val);
}
}
@Data
public class DynamicDataSource extends AbstractRoutingDataSource {
public DynamicDataSource() {
//设置默认的数据源
super.setDefaultTargetDataSource(DataSourceUtil.get());
HashMap<Object, Object> dataSources = new HashMap<>();
dataSources.put(DataSourceUtil.HELPER,DataSourceTestHandler.getHelperDataSource());
dataSources.put(DataSourceUtil.TRAINING,DataSourceTestHandler.getTrainingDataSource());
//设置目标数据源集合
super.setTargetDataSources(dataSources);
}
@Override
protected Object determineCurrentLookupKey() {
return DataSourceUtil.get();
}
}
剩下的就是将DynamicDataSource 注入容器即可,然后我们通过DataSourceUtil.set()方法就可以根据外部条件,完成数据源的动态切换了,这里就不演示了
策略模式本意是封装一系列可以互相替换的算法逻辑,可以选择让用户自己选择使用哪一种算法,也可以像用户屏蔽算法的多样性,程序自动判定选择最适合当前条件下的算法。
不理解策略模式的可以先看一下下面这篇文章:
这里用支付做一个简单的例子: 用户在支付的时候通常有多种支付方式的选择,常见的有微信支付和支付宝支付,如果要写支付过程的代码,我猜有人会这样写:
public class PayService {
private static final Integer ZHI_FU_BAO_PAY=1;
private static final Integer WEI_XIN_PAY=2;
public Boolean pay(Integer money,Integer type){
if(type.equals(ZHI_FU_BAO_PAY)){
//...
return true;
}else if(type.equals(WEI_XIN_PAY)){
//...
return true;
}else {
throw new UnsupportedOperationException("不支持的支付方式");
}
}
}
上面这种写法适合当前场景吗?
显然,不适合,大家可以思考,如果此时再添加进来银行卡支付,或者其他支付方式,那么每添加一次就需要增加一个分支。
并且pay方法主要精力应该放在支付上,而不是挑选支付方式上面,因此这里需要采用策略模式来优化一下:
来看一下具体应该怎么写吧:
public interface PayWay {
boolean pay(Integer money);
}
public class BankCardPayImpl implements PayWay{
@Override
public boolean pay(Integer money) {
System.out.println("使用银行卡支付: "+money);
return true;
}
}
public class WeiXinPayImpl implements PayWay{
@Override
public boolean pay(Integer money) {
System.out.println("使用微信支付: "+money);
return true;
}
}
public class ZhiFuBaoPayImpl implements PayWay{
@Override
public boolean pay(Integer money) {
System.out.println("使用支付宝支付: "+money);
return true;
}
}
准备好了相关策略实例之后,下面大家需要思考如何完成算法挑选的过程,即我们需要知道算法和标记的映射关系。
这里有两个思路:
先来看一下用集合保存映射关系的方式:
public class PayContext {
private PayWay payWay;
public static final Integer ZHI_FU_BAO_PAY=1;
public static final Integer WEI_XIN_PAY=2;
public static final Integer BANK_CARD_PAY=3;
private static final Map<Integer,PayWay> payWayMap=new HashMap<>();
static {
payWayMap.put(ZHI_FU_BAO_PAY,new ZhiFuBaoPayImpl());
payWayMap.put(WEI_XIN_PAY,new WeiXinPayImpl());
payWayMap.put(BANK_CARD_PAY,new BankCardPayImpl());
}
public boolean doPay(Integer money,Integer type){
payWay=payWayMap.get(type);
if(payWay==null){
throw new UnsupportedOperationException("不支持的支付类型");
}
return payWay.pay(money);
}
}
public class PayService {
PayContext payContext=new PayContext();
public Boolean pay(Integer money,Integer type){
return payContext.doPay(money,type);
}
}
如果采用多重条件语句的话:
public class PayContext {
private PayWay payWay;
public static final Integer ZHI_FU_BAO_PAY=1;
public static final Integer WEI_XIN_PAY=2;
public static final Integer BANK_CARD_PAY=3;
public boolean doPay(Integer money,Integer type){
if(type.equals(ZHI_FU_BAO_PAY)){
System.out.println("支付宝支付: "+money);
}else if(type.equals(WEI_XIN_PAY)){
System.out.println("微信支付: "+money);
}else if(type.equals(BANK_CARD_PAY)){
System.out.println("银行卡支付: "+money);
}else {
throw new UnsupportedOperationException("不支持的支付类型");
}
return payWay.pay(money);
}
}
显然这里更适合与第一种集合存放映射关系的方式,因为付款的选择可能会动态增加或者减少,而对于多重循环来说,它更适合与选择较为固定并且选择数量比较少的情况,所以要实事求是,没有哪一种写法更好,只有哪一种写法更符合当前需求
虽然策略模式定义上强调是对算法的封装,但是我们不应该被算法所禁锢,实际上,只要能够有效地剥离客户端代码与特定关注点之间的依赖关系,策略模式就应该被考虑。
Spring事务就是一个非常好的例子,通过TransactionManager来对事务管理接口进行统一抽象化,客户端透明的使用PlatformTransactionManager这一策略接口进行事务界定,及时具体的事务策略需要变更,对客户端代码来说影响也会很小。
Spring还在很多地方用到了策略模式:
还包括日志框架,日志门面就可以看做是一个抽象策略接口,而日志框架的具体实现就可以看做是不同的日志框架选择策略。
总结: 当针对同一件事情有多种选择的时候,我们都可以考虑用策略模式来统一抽象接口,为客户端代码造福。当然,也不要死记硬背策略模式,灵活使用,包括灵活转换,才是王道。
策略模式中的策略又分为单一策略和动态策略(这是我自己进行的分类)
对分布式事务不了解的,建议先阅读下面这篇文章:
Spring的JtaTransactionManager使用到了JDNI技术,不了解的可以去看看:
spring事务管理器的顶级抽象是PlatformTransactionManager接口,其提供了个重要的实现类:
JTATransactionManager是下面的重点,JTATransactionManager只是对各种具体的JTA实现产品提供的分布式事务功能进行封装,JTATransactionManager会将最终的工作委派给具体的JTA实现来做。因此,最终我们还是要追到JTA规范以及JTA实现中。
首先,具体的事务资源,RDBMS,MessageQueue等,想要加入JTA管理的分布式事务,JTA规范要求其实现javax.transaction.xa.XAResource接口,所以,希望加入JTA管理的分布式事务的RM通常会提供相应的适配器,用于提供基于XAResource的分布式事务交互能力,比如关系数据库提供的支持XA的JDBC驱动程序。
当需要参与分布式事务的RM都拥有了XAResource支持后,JTATransactionManager与RM之间就可以使用二阶段提交协议进行通信。
但是具体的JTATransactionManager与RM之间的联系要由ApplicationServer进行协调,ApplicationServer是应用程序运行的容器,实现了JTA规范。
JTATransactionManager与各个RM之间的交互,整个过程如下:
通信过程具体如下:
ApplicationServer一开始通过JNDI绑定它的JTA实现中的UserTransaction或者TransactionManager具体实现类,这样,客户端应用程序就可以通过JNDI获取他们。
ApplicationServer内部要求TransactionManager为当前事务分配一个唯一的标志(XID),然后开启事务,并且会将当前事务绑定到当前线程。
AppServer会通过跟RM的适配器获取一个事务资源对象,我们暂且称之为TransactionResource,该资源对象包含两部分,一部分是JtaTransactionManager需要的XAResource,另一部分是程序需要使用的Connection资源,取得TransactionResource之后,AppServer需要做下面两件事情:
AppServer从 TransactionResource取出XAResource后,交给TransactionManager,TransactionManager通过该XAResource与RM进行关于二阶段提交协议的交互。 实际上,现在TransactionManager只是调用XAResource的start(xid)方法通知RM开始记录。
AppServer然后再把与XAResource属于同一个TransactionalResource的Connection传递给客户端应用程序使用,然后客户端应用程序就可以使用AppServer传给的Connection进行数据访问操作。
数据访问操作完成后,关闭之前AppServer传给的Connection,AppServer在对应Conn被关闭后,会通知TransactionManager,由其调用对应的XAResource的end(xid)结束事务记录。
当客户端通过UseTransaction或者TransactionManager的响应方法要求结束事务的时候,AppServer就会通知TransactionManager使用两阶段提交协议提交当前事务。
TransactionManager调用XAResource的prepare(xid)方法通知各个RM准备提交事务
如果各个XAResource回答都是OK,TransactionManager调用XAResource的commit(xid)方法通知各个RM最终提交事务。
AppServer屏蔽了各个RM具体参与JtaTransactioNmanager所管理的分布式事务细节,客户端应用程序通过服务器的JNDI获取DataSource等资源,是因为只有使用AppServer公开的与XAResource绑定到同一TransactionResource的Connection,才能保证客户端应用程序所做的数据访问操作都能加入到AppServer协调的分布式事务中。
AppServer为客户端应用程序公开与当前分布式事务相关的Conn方式,就是实现一个DataSource,然后把该DataSource绑定到JNDI,这样,客户端应用程序就可以通过从JNDI中取得DataSource,获取与事务相关的Connection了。
不过,如果我们使用的JTA实现不是相应的AppServer提供的,比如,可以独立使用的Atomikos或者JOTM等JTA实现,要求我们从应用服务器的JNDI服务取得相应的DataSource这一前提是不成立的,这时,我们直接使用各个JTA产品提供的DataSource封装类进行数据访问即可,与AppServer屏蔽RM与TransactionManager之间的关系一样,这些产品也有与AppServer完成同样工作的角色,为我们关联具体的RM预当前产品的TransactionManager。
对于Spring而言,其只提供了对分布式事务的顶层类封装,方便将其纳入原本的Spring的事务管理中,该类就是JtaTransactionManager
AbstractPlatformTransactionManager之前说过,该抽象类负责对PlatformTransactionManager中提供的接口进行实现,但是其中commit,getTransaction,rollback等部分,其中真正去执行这些操作的时候,会留给子类去实现相关更小范围的接口
获取一个事务对象:
@Override
protected Object doGetTransaction() {
//UserTransaction 就是不同的分布式事务实现库提供的
UserTransaction ut = getUserTransaction();
if (ut == null) {
throw new CannotCreateTransactionException("No JTA UserTransaction available - " +
"programmatic PlatformTransactionManager.getTransaction usage not supported");
}
//如果我们没有手动去设置,那么这里会通过JDNI去查询
if (!this.cacheUserTransaction) {
ut = lookupUserTransaction(
this.userTransactionName != null ? this.userTransactionName : DEFAULT_USER_TRANSACTION_NAME);
}
//真正去获取一个事务
return doGetJtaTransaction(ut);
}
//封装为一个JtaTransactionObject事务对象后返回
protected JtaTransactionObject doGetJtaTransaction(UserTransaction ut) {
return new JtaTransactionObject(ut);
}
开启一个事务:
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
//拿到传入的事务对象
JtaTransactionObject txObject = (JtaTransactionObject) transaction;
try {
//开启分布式事务
doJtaBegin(txObject, definition);
}
catch (NotSupportedException | UnsupportedOperationException ex) {
throw new NestedTransactionNotSupportedException(
"JTA implementation does not support nested transactions", ex);
}
catch (SystemException ex) {
throw new CannotCreateTransactionException("JTA failure on begin", ex);
}
}
protected void doJtaBegin(JtaTransactionObject txObject, TransactionDefinition definition)
throws NotSupportedException, SystemException {
//设置隔离级别
applyIsolationLevel(txObject, definition.getIsolationLevel());
int timeout = determineTimeout(definition);
//设置超时时间
applyTimeout(txObject, timeout);
//获取到相关分布式事务的UserTransaction,开启一个分布式事务
txObject.getUserTransaction().begin();
}
对于atomikos而言,其会调用UserTransactionImp中的TransactionManagerImp来真正开启一个分布式事务
JtaTransactionManager提供的其他接口也是类似操作,最终都是调用引入的分布式事务实现库中相关方法,完成最终分布式事务的操作,而Spring本身提供的JtaTransactionManager并不具备对分布式事务处理的能力。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://cjdhy.blog.csdn.net/article/details/125232126
内容来源于网络,如有侵权,请联系作者删除!