事务就是一个原子性操作,该操作内部的一组命令,要么都执行成功,要么都执行失败
事务的大家族中常会出现下面几个重要的家庭成员:
我们通常会按照事务中涉及的RM的多寡将事务分为两类,即全局事务和局部事务.
如果整个事务处理过程中存在多个RM,那么就需要通过TP Monitor来协调多RM间的事务一致性。TP Monitor通过两阶段提交协议来确保整个事务的ACID属性。
通常这种场景下的事务,被称为全局事务或者分布式事务。
所有应用程序提交的事务请求,都需要通过TP Monitor的调配之后,直接由TM统一管理,TM将使用两阶段提交协议来协调多RM之间的事务处理。
两阶段提交的过程可类比如下场景:
但是如果其中一方不愿意呢?
只有在双方都确认的条件下面,整个事务才能顺利提交,否则一旦一方反悔,事务就必须回滚到之前的状态
如果当前事务中只有一个RM参与其中,我们就可以称当前事务为局部事务。
比如,在当前事务中只对一个数据库进行更新,或者只向一个消息队列中发送消息的情况,都属于局部事务。
因为局部事务只存在一个RM,因此没有必要引入相应的TP Monitor来帮助协调管理多个RM之间的事务处理。
通常情况下,相应的RM内部都有内置的事务支持,所以,在局部事务中,我们更倾向于之间使用RM的内置事务支持。
局部事务与全局事务的区分在于事务中涉及到的RM数量,而不是系统中实际有多少RM,因为即使系统中存在多个数据库(即RM),只要当前事务只更新一个数据库的数据,那么当前事务就应该算作局部事务,而不是全局事务。
对于局部事务而言,一般都直接使用RM内置的事务支持,当然也可以通过引入TP Monitor在分布式事务场景下进行事务管理(但是显然没必要)。
通过情况下,各个TP Monitor在实现的时候都会判断当前参与事务的RM数量,如果只有一个RM参与,那么会做一定优化处理,避免使用两阶段提交协议带来额外的性能损耗。
在Java的局部事务场景中,系统里事务管理的具体处理方式,会随着所使用的数据访问技术的不同而各异。
我们不是使用专用的事务API来管理事务,而是通过当前使用的数据访问技术所提供的基于connection的api来管理事务。
要对数据库的访问过程中的事务进行管理,每种数据访问技术都提供了特定与它自身的事务管理API,比如JDBC是Java平台访问关系数据库最基础的API。
如果直接使用JDBC进行数据访问的话,我们可以将数据库连接的自动提交设置为false,改为手动提交来控制整个事务的提交或者回滚。
public boolean giveMoney(String Giver,String Revicer,int money) {
....
try{
conn = JDBCUtil.getConnection();
//开启事务
conn.setAutoCommit(false);
....
//结束事务
conn.commit();
return true;
} catch (SQLException throwables) {
//事务进行回滚
conn.rollback();
....
}
过程大致如上
Java平台上的分布式事务管理,主要是通过JTA或者JCA提供支持的。
JTA是Sun公司提出的标准化分布式事务访问的Java接口规范。不过,JTA规范定义的知识一套Java的接口定义,具体的实现留给了相应的提供商去实现,各个Java EE应用服务器需要提供对JTA的支持。另外,除了可以使用绑定到Java EE应用服务器的JTA实现之外,Java平台也存在几个独立的并且比较成熟的JTA实现产品,包括:
JCA规范主要面向EIS的集成,通过为遗留的EIS系统和JAVA EE应用服务器指定统一的通信标准,二者可以实现各种服务上的互通。
显然,用过spring的小伙伴都体验过了spring的全套事务服务带来的舒爽体验,也解决了上面提到的这些问题,并且还更加强大,那么下面就来一起探究一下吧。
org.springframework.transaction.PlatformTransactionManager是Spring事务王国的核心接口,它规定了为应用程序提供事务界定的统一方式。
public interface PlatformTransactionManager extends TransactionManager {
TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException;
void commit(TransactionStatus status) throws TransactionException;
void rollback(TransactionStatus status) throws TransactionException;
}
PlatformTransactionManager是整个事务抽象策略的顶层接口,Spring的事务框架针对不同的数据访问方式以及全局事务场景,提供相应的实现类。
如果让你给出一个PlatformTransactionManager的自定义实现类,how do you do it ?
这里先用JDBC数据访问方式的局部事务为例:
我们通常是将事务管理放在Service层,而将数据访问逻辑放在Dao层。这样可以提高数据访问逻辑的重用性,并且在Service层根据相应的逻辑来决定是提交或者回滚事务。
因为JDBC的局部事务控制是需要通过同一个Connection来完成的,因此要保证两个DAO的数据访问处于同一个事务中,就需要保证它们使用的是同一个connection。
要完成这点,通常我们会采用connection-passing的方式,即为同一个事务中的各个dao的数据访问方法传递当前的Connection。
上面这种事务管控方式最大的问题在于事务代码无法摆脱connection的束缚,导致connection与当前业务代码耦合
我们应该将事务过程中用到的Connection实例放置于一个统一地点,这样就解除了事务管理代码和数据访问代码直接通过connection的耦合。
有一个办法是: 事务开始前取得一个connection,然后将这个connection绑定到当前调用线程。之后,数据访问对象在使用connection进行数据访问的时候,就可以从当前线程上获得这个事务开始时候绑定的connection实例。
当所有的数据访问都完成后,我们就可以将connection提交或者回滚事务,然后解除它到当前线程的绑定。
如果要让我们来自定义一个存放当前线程绑定的conn的类,那么大概会长下面这样:
public class TransactionResourceManager {
private static ThreadLocal resources=new ThreadLocal();
public static Object getResource(){
return resources.get();
}
public static void bindResource(Object resource){
resources.set(resource);
}
public static Object unbindResource(){
Object resource = getResource();
resources.set(null);
return resource;
}
}
对于我们要实现的针对JDBC的PlatformTransactionManager ,只需要在事务开始的时候,通过我们的TransactionResourceManager 将connection绑定到线程,然后再事务结束的时候解除绑定即可。
这里给出JdbcTransactionManager简单实现:
public class JdbcTransactionManager implements PlatformTransactionManager {
private DataSource dataSource;
public JdbcTransactionManager(DataSource dataSource) {
this.dataSource = dataSource;
}
@Override
public TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
try {
Connection connection = dataSource.getConnection();
connection.setAutoCommit(false);
TransactionResourceManager.bindResource(connection);
return new DefaultTransactionStatus(connection,true,true,false,true,null);
} catch (SQLException e) {
throw new CannotCreateTransactionException("cannot get connection for tx",e);
}
}
@Override
public void commit(TransactionStatus status) throws TransactionException {
Connection connection = (Connection) TransactionResourceManager.unbindResource();
try {
connection.commit();
} catch (SQLException e) {
throw new UnexpectedRollbackException("commit failed with SQLex",e);
}finally {
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
@Override
public void rollback(TransactionStatus status) throws TransactionException {
Connection connection = (Connection) TransactionResourceManager.unbindResource();
try {
connection.rollback();
} catch (SQLException e) {
throw new UnexpectedRollbackException("rollback failed with SQLex",e);
}finally {
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
当然上面给出的是简单的实现,spring可没有那么容易让你看透,上面给出的代码还存在以下问题:
spring提供的DataSourceUtils类就是用来完成对connection的管理,DataSourceUtils会从类似TransactionResourceManager 的类中获取connection资源,如果当前线程之前没有绑定任何connection,那么它就通过数据访问对象的DataSource引用获取新的connection,否则就是要绑定的那个Connection。
这就是为什么要强调,当我们使用spring提供的事务支持的时候,必须通过DataSourceUtils来获取连接,因为它提供了Spring事务管理框架在数据访问层需要提供的基础设置中不可或缺的一部分。
Spring的事务王国,有三大护法,他们三个共同维护着事务的一致性,他们分别是:
TransactionDefinition主要定义了事务属性相关配置:
TransactionDefinition内定义了如下五个常量用于标志可供选择的隔离级别:
事务的传播行为涉及到一个事务在执行过程中,调用涉及其他事务时,相关事务的表现行为,如下图所示:
TransactionDefinition为事务的传播行为定义了下面几种选择:
注意区分PROPAGATION_NESTED和PROPAGATION_REQUIRES_NEW:
TransactionDefinition提供了TIMEOUT_DEFAULT常量定义,用来指定事务的超时时间。该值默认为-1,这会采用当前事务系统默认的超时时间,如果 底层的transaction manager不支持TIMEOUT_DEFAULT,那么必须将TIMEOUT_DEFAULT设置为-1,否则会抛出异常
TransactionDefinition提供的最后一个功能是是否将要创建一个只读的事务,如果 底层的transaction manager不支持只读事务,那么就不会理睬该设定
基本的事务属性我都在底层接口规定好,但是当我们需要切换不同规定底层实现时,底层的transaction manager不一定支持所有的属性,也可能有自己特有的属性控制
TransactionDefinition继承结构图如下:
编程式事务侧重于通过程序员的编码方式来处理事务,而声明式事务则侧重于通过一个aop切面加上声明式事务提供的事务属性,来暗地中处理事务,程序员只需要指定相关事务属性即可。
DefaultTransactionDefinition是其默认实现,他提供了各种事务属性的默认值,并且通过它的setter方法,我们可以更改这些默认值。
这些默认值包括:
private int propagationBehavior = PROPAGATION_REQUIRED;
private int isolationLevel = ISOLATION_DEFAULT;
private int timeout = TIMEOUT_DEFAULT;
private boolean readOnly = false;
TransactionTemplate是Spring提供的进行编程式事务管理的模板方法类,他直接继承了DefaultTransactionDefinition。
TransactionTemplate的设计思想和JDBCTemplate一致,是通过模板方法加回调接口的方式,将通用的事务处理代码,和资源管理封装为模板方法,而将需要变化的,并且需要被事务包裹的代码,以回调接口的形式传递出去。
该类中有两个核心方法,如下:
//TransactionCallback就是上面讲到的需要动态变化的回调接口,该接口内部封装了相关业务代码
public <T> T execute(TransactionCallback<T> action) throws TransactionException {
Assert.state(this.transactionManager != null, "No PlatformTransactionManager set");
//CallbackPreferringPlatformTransactionManager是一个函数式接口,提供了一个execute方法
//这里直接在该回调接口内部完成完全事务管理工作,然后返回结果
if (this.transactionManager instanceof CallbackPreferringPlatformTransactionManager) {
return ((CallbackPreferringPlatformTransactionManager) this.transactionManager).execute(this, action);
}
//进入模板方法阶段--------->
else {
//从事务管理器中拿到封装有当前事务状态的类
TransactionStatus status = this.transactionManager.getTransaction(this);
T result;
try {
//回调接口执行,但是会传入事务状态,根据事务的相关状态: 事务传播行为,事务隔离级别,事务超时时间等
//进行处理--具体要不要开启事务,是这里决定的
result = action.doInTransaction(status);
}
//出现异常: 回滚
catch (RuntimeException | Error ex) {
// Transactional code threw application exception -> rollback
rollbackOnException(status, ex);
throw ex;
}
catch (Throwable ex) {
// Transactional code threw unexpected exception -> rollback
rollbackOnException(status, ex);
throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception");
}
// 如果没有出现异常,就提交事务,然后返回处理结果
this.transactionManager.commit(status);
return result;
}
}
那么再来看看事务回滚这边是怎么写的:
private void rollbackOnException(TransactionStatus status, Throwable ex) throws TransactionException {
Assert.state(this.transactionManager != null, "No PlatformTransactionManager set");
logger.debug("Initiating transaction rollback on application exception", ex);
try {
//通过事务管理器根据事务状态回滚事务
this.transactionManager.rollback(status);
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
}
TransactionAttribute是继承自TransactionDefinition的接口定义,主要面向使用Spring AOP进行声明式事务管理的场合,它新增了一个rollbackon方法。
/**
*
*/
boolean rollbackOn(Throwable ex);
spring 3.0和spring 5.0之后有新增了两个方法:
/**
* Return a qualifier value associated with this transaction attribute.
* This may be used for choosing a corresponding transaction manager
* to process this specific transaction.
* 用来挑选一个事务管理器来执行当前事务
*/
@Nullable
String getQualifier();
/**
* Return labels associated with this transaction attribute.
* This may be used for applying specific transactional behavior
* or follow a purely descriptive nature.
* 通过标签属性,可以应用具体的事务行为
*/
Collection<String> getLabels();
DefaultTransactionAttribute是TransactionAttribute的默认实现,他同时继承了DefaultTransactionDefinition。
它内部有四个属性,如下:
//描述符
@Nullable
private String descriptor;
//超时时间的设置
@Nullable
private String timeoutString;
//用于匹配事务管理器
@Nullable
private String qualifier;
//标签
private Collection<String> labels = Collections.emptyList();
在DefaultTransactionDefinition的基础上新增了rollbackon的实现。DefaultTransactionDefinition的实现指定了,当异常类型为unchecked exception的情况下将会回滚事务。
@Override
public boolean rollbackOn(Throwable ex) {
return (ex instanceof RuntimeException || ex instanceof Error);
}
DefaultTransactionAttribute下有两个实现类:
DelegatingTransactionAttribute是抽象类,它会将所有方法的调用委派给另一个具体的TransactionAttribute实现类,比如DefaultTransactionAttribute或者RuleBasedTransactionAttribute 。
public abstract class DelegatingTransactionAttribute extends DelegatingTransactionDefinition
implements TransactionAttribute, Serializable {
private final TransactionAttribute targetAttribute;
public DelegatingTransactionAttribute(TransactionAttribute targetAttribute) {
super(targetAttribute);
this.targetAttribute = targetAttribute;
}
@Override
@Nullable
public String getQualifier() {
return this.targetAttribute.getQualifier();
}
@Override
public Collection<String> getLabels() {
return this.targetAttribute.getLabels();
}
@Override
public boolean rollbackOn(Throwable ex) {
return this.targetAttribute.rollbackOn(ex);
}
}
下面是TransactionStatus的类继承图:
从名字就可以推断出来,TransactionStatus是记录整个事务处理过程中的事务状态,更多的时候,我们将在编程式事务中使用该接口。
下面先来看看TransactionStatus继承的那些接口都提供了什么作用吧:
//一般事务执行完毕后,savepoint会被自动释放
//savepoint一般搭配嵌套事务作为传播行为使用
public interface SavepointManager {
/**
在当前事务中,创建一个savepoint,如果发生异常,可以选择回滚到savepoint
*/
Object createSavepoint() throws TransactionException;
/**
回滚到savepoint
*/
void rollbackToSavepoint(Object savepoint) throws TransactionException;
/**
手动释放savepoint
*/
void releaseSavepoint(Object savepoint) throws TransactionException;
}
如果使用过mysql的savepoint的小伙伴应该知道上面再说什么,不知道的可以先去查资源
public interface TransactionExecution {
/**
* 是否是新事务,或者参与到一个现存的事务中,或者不参与任务事务
*/
boolean isNewTransaction();
/**
* rollbackOnly标记当前事务让其进行回滚
*/
void setRollbackOnly();
boolean isRollbackOnly();
/**
* 当前事务是否结束了,即是否被提交或者回滚了
*/
boolean isCompleted();
}
//Flushable接口就是写出数据
public interface TransactionStatus extends TransactionExecution, SavepointManager, Flushable {
/**
当前事务内部是否设置了savepoint,如果设置了说明当前事务是嵌套事务
*/
boolean hasSavepoint();
/**
* 将命令发送到服务端
*/
@Override
void flush();
}
AbstractTransactionStatus是TransactionStatus的抽象实现。
首先,该抽象类内部维护了三个属性:
//当前事务是否需要回滚
private boolean rollbackOnly = false;
//事务是否完成的标识
private boolean completed = false;
//父类saveManager管理savepoint
@Nullable
private Object savepoint;
//---------------------------------------------------------------------
// Implementation of TransactionExecution
//---------------------------------------------------------------------
@Override
public void setRollbackOnly() {
this.rollbackOnly = true;
}
@Override
public boolean isRollbackOnly() {
return (isLocalRollbackOnly() || isGlobalRollbackOnly());
}
public boolean isLocalRollbackOnly() {
return this.rollbackOnly;
}
/**
* 本类总是返回false,具体由子类重写来决定
*/
public boolean isGlobalRollbackOnly() {
return false;
}
/**
* Mark this transaction as completed, that is, committed or rolled back.
*/
public void setCompleted() {
this.completed = true;
}
@Override
public boolean isCompleted() {
return this.completed;
}
// Implementation of SavepointManager
//---------------------------------------------------------------------
//子类内部会维护一个SavepointManager变量,然后相关的savpoint管理方法都有该内部实例完成
//这里体现出了代理的思想
@Override
public Object createSavepoint() throws TransactionException {
return getSavepointManager().createSavepoint();
}
@Override
public void rollbackToSavepoint(Object savepoint) throws TransactionException {
getSavepointManager().rollbackToSavepoint(savepoint);
}
@Override
public void releaseSavepoint(Object savepoint) throws TransactionException {
getSavepointManager().releaseSavepoint(savepoint);
}
//子类重写实现
protected SavepointManager getSavepointManager() {
throw new NestedTransactionNotSupportedException("This transaction does not support savepoints");
}
// Handling of current savepoint state
//---------------------------------------------------------------------
@Override
public boolean hasSavepoint() {
return (this.savepoint != null);
}
protected void setSavepoint(@Nullable Object savepoint) {
this.savepoint = savepoint;
}
@Nullable
protected Object getSavepoint() {
return this.savepoint;
}
public void createAndHoldSavepoint() throws TransactionException {
setSavepoint(getSavepointManager().createSavepoint());
}
public void rollbackToHeldSavepoint() throws TransactionException {
Object savepoint = getSavepoint();
if (savepoint == null) {
throw new TransactionUsageException(
"Cannot roll back to savepoint - no savepoint associated with current transaction");
}
getSavepointManager().rollbackToSavepoint(savepoint);
getSavepointManager().releaseSavepoint(savepoint);
setSavepoint(null);
}
public void releaseHeldSavepoint() throws TransactionException {
Object savepoint = getSavepoint();
if (savepoint == null) {
throw new TransactionUsageException(
"Cannot release savepoint - no savepoint associated with current transaction");
}
getSavepointManager().releaseSavepoint(savepoint);
setSavepoint(null);
}
其实可以看出来,该抽象类中关于savePoint的相关管理方法,都只是一层代理壳子,最终都转交给了内部维护的SavepointManager对象实例代为执行
AbstractTransactionStatus下面还有两个子类:
SimpleTransactionStatus主要用于测试,这里直接略过了。
该类中有如下六个属性:
@Nullable
private final Object transaction;
private final boolean newTransaction;
private final boolean newSynchronization;
private final boolean readOnly;
private final boolean debug;
@Nullable
private final Object suspendedResources;
//其余就是上面这些属性的get和set方法了
DefaultTransactionStatus是Spring框架内部使用的主要TransactionStatus实现类。
Spring事务中的各个TransactionManager的实现,大都借助于DefaultTransactionStatus来记载事务状态信息。
public interface PlatformTransactionManager extends TransactionManager {
TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException;
void commit(TransactionStatus status) throws TransactionException;
void rollback(TransactionStatus status) throws TransactionException;
}
PlatformTransactionManager的整个抽象体系基于策略模式实现,由PlatformTransactionManager对事务界定进行统一抽象,而具体的界定策略的实现则交由具体的实现类.
PlatformTransactionManager的实现类可以划分为面向局部事务和面向全局事务两个分支:
Spring为各种数据访问技术提供了现成的PlatformTransactionManager实现支持。
别的都暂时不看,我们只看最熟悉的一个DataSourceTransactionManager,它对应的就是JDBC和MyBaits。
全局事务主要由JtaTransactionManager负责,JtaTransactionManager负责对各种JTA实现提供的分布式事务进行统一封装,只不过它的事务管理操作,最终都会委派给具体的JTA实现来完成。
在进行剖析之前,我们先理清楚Spring事务处理过程中涉及到的一些模型:
AbstractPlatformTransactionManager作为DataSourceTransactionManager的父类,以模板方法的形式封装了固定的事务处理逻辑,而将事务资源相关的操作以protected或者abstract方法的形式留给DataSourceTransactionManager来实现。
作为模板方法父类,AbstractPlatformTransactionManager替子类实现了一下固定的事务内部处理逻辑:
上面这些固定的事务处理逻辑,大都体现在以下几个模板方法中
该方法主要目的在于开启一个事务,但是在此之前需要帕努单一下之前是否已经存在了事务,如果存在,则根据TransactionDefinition 中的事务传播行为决定是挂起当前事务还是抛出异常。
同样的,不存在事务的情况下,也需要根据传播行为的具体语义来决定如何处理。
//TransactionDefinition主要规定了事务的传播行为和隔离级别,还有事务超时时间和是否为只读事务等
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
// Use defaults if no transaction definition given.
//传null,我就用默认的实现
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
//1.模板过程第一步: 开启一个事务
Object transaction = doGetTransaction();
....
不同的实现,获取的transaction 类型不一样,在这里DataSourceTransactionManager会返回DataSourceTransactionObject类型实例。
因此将获取transaction的过程抽象化,留个不同的子类去实现。
protected abstract Object doGetTransaction() throws TransactionException;
DataSourceTransactionManager在实现doGetTransaction方法逻辑的时候,会从TransactionSynchronizationManager获取绑定资源后添加到DataSourceTransactionObject中返回。
@Override
protected Object doGetTransaction() {
//DataSourceTransactionObject负责对connectionHolder进行一层包装
//在当前connection相关属性的记录
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
//TransactionSynchronizationManager定义很多ThreadLocal属性,保存当前线程执行事务过程中的一些属性
//这里尝试判断当前DataSource是否已经创建了一个connection
//如果已经创建了,说明存在事务了已经
//那么返回对应的connection,否则返回Null,表明是一个新事务
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
//设置ConnectionHolder进去
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
// Use defaults if no transaction definition given.
//传null,我就用默认的实现
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
//1.模板过程第一步: 开启一个事务
Object transaction = doGetTransaction();
//debugEnabled记录logger的debug是否可以打印,这样可以避免后续多次调用logger.isDebugEnabled()进行查询
//相当于一种缓存的思想
boolean debugEnabled = logger.isDebugEnabled();
//判断当前是否存在事务,根据不同的判定结果进行不同处理
//本类中isExistingTransaction方法默认返回false,子类需要重写该方法逻辑,以此来判断当前是否存在事务
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(def, transaction, debugEnabled);
}
...
对于DataSourceTransactionManager来说,判断当前是否存在事务的逻辑如下:
@Override
protected boolean isExistingTransaction(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
//通过ConnectionHolder判断当前是否存在活跃事务
//如果getTransaction中TransactionSynchronizationManager.getResource(obtainDataSource())
//返回值不为空并且事务活跃标记被设置了,说明存在事务了已经
return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}
如果isExistingTransaction返回true,即当前存在事务的情况下,由handleExistingTransaction方法统一处理当前存在事务。
具体如何处理,则应该按照TransactionDefinition中指定的传播行为分情况讨论。
分类讨论的代码如下:
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
//如果定义的传播行为是PROPAGATION_NEVER,抛出异常并退出
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
//如果传播行为为PROPAGATION_NOT_SUPPORTED,则挂起当前事务,然后返回
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
//挂起当前事务--保存当前事务一系列状态,并清空TransactionSynchronizationManager中
//各个ThreadLocal保存的当前事务的信息,为下面新事务腾出空间进行记录
//返回的suspendedResources保存了被挂起事务的各种信息,方便后面恢复挂起事务
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
//返回一个DefaultTransactionStatus实例,因为当我们挂起了当前事务,而PROPAGATION_NOT_SUPPORTED不需要事务
//所以,返回的DefaultTransactionStatus不包含transaction obj的信息
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
//如果传播形为是PROPAGATION_REQUIRES_NEW,则挂起当前事务,并开始一个新的事务并返回
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
//挂起当前事务
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
//开启一个新事务然后返回
return startTransaction(definition, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error beginEx) {
//恢复之前挂起的事务--依靠suspendedResources进行事务的现场恢复
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
....
startTransaction方法源码如下:
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//抽象方法,需要具体子类来实现
doBegin(transaction, definition);
//如果是新事务被创建,那么需要重新设置一下TransactionSynchronizationManager中
//ThreadLocal属性们保存的信息,将其改为新事务的具体信息
prepareSynchronization(status, definition);
return status;
}
DataSourceTransactionManager中,doBegin()方法首先会检查传入的transaction,提取必要的信息判断之前是否存在绑定的conn信息,如果没有,则从DataSource中获取新的conn,然后将其AutoCommit状态设置为false,并绑定到TransactionSynchronizationManager。
当然,这期间也会涉及事务定义的应用以及条件检查等逻辑,在所有一切搞定后,newTransactionStatus会创建一个包含definition,transaction object以及挂起的事务信息和其他状态信息的DefaultTransactionStatus并返回。
DataSourceTransactionManager:
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
//hasConnectionHolder返回true说明不是新事务,而是已经存在了一个事务
//这里能够进去,说明是新事务被创建,或者 SynchronizedWithTransaction属性为true
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
//获取一个connection
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
//在这里txObject会被设置一个ConnectionHolder
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
//设置SynchronizedWithTransaction为true
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
//判断当前conn是否需要被设置为只读和当前conn隔离等级是否需要改变
//这里需要保存未改变前的隔离级别,方便在当前事务结束后,将隔离级别重置会原先的级别
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
txObject.setReadOnly(definition.isReadOnly());
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
//取消自动提交
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
//是否需要设置当前事务为只读事务
prepareTransactionalConnection(con, definition);
//设置事务活跃标记
txObject.getConnectionHolder().setTransactionActive(true);
//超时时间设置
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// Bind the connection holder to the thread.
//将connection holder绑定到当前线程---ThreadLocal
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
继续分析其他情况:
....
//如果事务传播行为为PROPAGATION_NESTED,则根据情况创建嵌套事务,如果通过savepoint或JTA的TransactionManager
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
//是否允许嵌套事务
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
//是否允许使用savepoint来创建嵌套事务
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
status.createAndHoldSavepoint();
return status;
}
//使用JTATransactionManager来提供嵌套事务支持
else {
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
return startTransaction(definition, transaction, debugEnabled, null);
}
}
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
...
下面是对事务状态进行匹配校验:
如果需要检查事务状态匹配情况,则对当前存在事务与传入的defintion中定义的隔离级别与ReadOnly属性进行检查,如果数据不吻合,则抛出异常:
....
//isValidateExistingTransaction默认为false
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
//剩下的就是其他情况下,直接构建TransactionStatu然后返回,比如对应PROPAGATION_SUPPORTS和PROPAGATION_REQUIRED
//别忘了此时是已经存在事务的情况下
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
// Use defaults if no transaction definition given.
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(def, transaction, debugEnabled);
}
//如果不存在事务,那么按照不同的传播行为,应该怎么处理:
// Check definition settings for new transaction.
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
//如果传播行为是PROPAGATION_MANDATORY,则抛出异常,因为不存在当前事务
// No existing transaction found -> check propagation behavior to find out how to proceed.
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
//如果传播行为为下面三者其一,则开启新的事务
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
return startTransaction(def, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + def);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
先来看看rollback:
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;
try {
//回调TransactionSynchronization的接口
triggerBeforeCompletion(status);
//如果是嵌套事务,
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
//回滚到savepoint
status.rollbackToHeldSavepoint();
}
//如果是新事务
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
//抽象方法,子类实现
doRollback(status);
}
else {
// Participating in larger transaction
//是否参与进了一个更大的事务中去
if (status.hasTransaction()) {
//并且回滚标记被设置了
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
//调用子类实现的方法,将当前transaction object的状态设置为回滚
doSetRollbackOnly(status);
}
else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
// Unexpected rollback only matters here if we're asked to fail early
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
}
//触发回滚后的回调接口
catch (RuntimeException | Error ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
}
finally {
//清理事务资源
cleanupAfterCompletion(status);
}
}
DataSourceTransactionManager实现的doRollback方法:
@Override
protected void doRollback(DefaultTransactionStatus status) {
//调用connection的rollback方法
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
}
try {
con.rollback();
}
catch (SQLException ex) {
throw translateException("JDBC rollback", ex);
}
}
清理资源做了啥:
private void cleanupAfterCompletion(DefaultTransactionStatus status) {
//记录事务进行的状态为已完成
status.setCompleted();
//清理与当前事务相关的TransactionSynchronization
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.clear();
}
//释放事务资源,并解除TransactionSynchronizationManager的资源绑定
//对于DataSourceTransactionManager来说是关闭数据库连接,然后解除Datasource对资源的绑定
if (status.isNewTransaction()) {
doCleanupAfterCompletion(status.getTransaction());
}
//如果之前有挂起的事务,恢复挂起的事务
if (status.getSuspendedResources() != null) {
if (status.isDebug()) {
logger.debug("Resuming suspended transaction after completion of inner transaction");
}
Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
}
}
在来看看commIt:
@Override
public final void commit(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
//当前事务回滚标记是否被设置为true
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
processRollback(defStatus, false);
return;
}
//全局事务回滚标记是否被设置为true,如果最外层事务已经被标记为rollbackonly,那么就进行回滚
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
processRollback(defStatus, true);
return;
}
//执行事务提交操作
processCommit(defStatus);
}
processCommit执行事务提交源码:
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
//一些回调接口
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
//如果当前事务是嵌套事务,则释放savepoint
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
}
//如果是一个新事务
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
//由子类实现真正的事务提交逻辑
doCommit(status);
}
else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException | Error ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
cleanupAfterCompletion(status);
}
}
DataSourceTransactionManager实现的doCommit方法,调用connection.commit方法即可
@Override
protected void doCommit(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Committing JDBC transaction on Connection [" + con + "]");
}
try {
con.commit();
}
catch (SQLException ex) {
throw translateException("JDBC commit", ex);
}
}
suspend是将某个事务挂起,那么挂起到底是怎么操作的呢?
@Nullable
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
//被挂起的事务是否还处于活跃状态
if (TransactionSynchronizationManager.isSynchronizationActive()) {
//首先暂停与当前线程相关的所有TransactionSynchronization
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
if (transaction != null) {
//暂停目标事务
suspendedResources = doSuspend(transaction);
}
//清空TransactionSynchronizationManager关于当前线程的一些事务记录
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
//SuspendedResourcesHolder保存被挂起的事务的所有信息状态,方便日后恢复使用
return new SuspendedResourcesHolder(
suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
}
catch (RuntimeException | Error ex) {
// doSuspend failed - original transaction is still active...
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
}
else if (transaction != null) {
// Transaction active but no synchronization active.
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
}
else {
// Neither transaction nor synchronization active.
return null;
}
}
DataSourceTransactionManager实现的doSuspend方法
@Override
protected Object doSuspend(Object transaction) {
//解除当前transaction对于ConnectionHolder的绑定关系
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
txObject.setConnectionHolder(null);
//解除TransactionSynchronizationManager与ConnectionHolder资源的绑定关系,然后返回对应的ConnectionHolder资源
return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
resume是恢复某个被挂起的事务
protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
throws TransactionException {
//SuspendedResourcesHolder保存了事务被挂起前的状态,这里只需要从中读取然后进行恢复即可
if (resourcesHolder != null) {
//suspendedResources就是上面解绑的ConnectionHolder
Object suspendedResources = resourcesHolder.suspendedResources;
if (suspendedResources != null) {
//恢复事务,就是重新绑定一下ConnectionHolder
doResume(transaction, suspendedResources);
}
List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
if (suspendedSynchronizations != null) {
//设置TransactionSynchronizationManager关于当前事务的相关状态
TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
//恢复suspendedSynchronizations
doResumeSynchronization(suspendedSynchronizations);
}
}
}
DataSourceTransactionManager实现的doResume方法:
protected void doResume(@Nullable Object transaction, Object suspendedResources) {
//就是再绑定一下ConnectionHolder
TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
}
AbstractPlatformTransactionManager需要子类实现或者覆写的方法,对于各个子类来说,无非是根据自身需要管理的资源和事务管理的API提供这些方法的实现而已。
对于AbstractPlatformTransactionManager来说,他的设计思想很简单,就是将不变的结构都固定下来,例如: 一些常规校验逻辑,资源释放逻辑,而具体的实现,例如事务提交和回滚操作,不同的数据源会有不同的操作方法,因此这里通过抽象方法的形式暴露给子类来决定。
然后,为了提高可扩展性,还在各个执行阶段中,提供了一些回调接口,提供扩展性。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://cjdhy.blog.csdn.net/article/details/125052623
内容来源于网络,如有侵权,请联系作者删除!