Seata解析-锁管理器LockManager详解

x33g5p2x  于2021-12-21 转载在 其他  
字(9.3k)|赞(0)|评价(0)|浏览(587)

本文基于seata 1.3.0版本

之前在介绍分支事务注册的时候需要对记录主键加锁,这里加锁使用便是LockManager的实现类FileLockManager。本文将详细介绍seata的锁管理器。

一、LockManager

锁管理器的顶级接口是LockManager,锁管理器必须要实现该接口。

public interface LockManager {

    /** * 申请锁 */
    boolean acquireLock(BranchSession branchSession) throws TransactionException;

    /** * 释放锁 */
    boolean releaseLock(BranchSession branchSession) throws TransactionException;

    /** * 释放全局事务中每个分支事务的锁 */
    boolean releaseGlobalSessionLock(GlobalSession globalSession) throws TransactionException;

    /** * 查看是否对lockKey加了锁 */
    boolean isLockable(String xid, String resourceId, String lockKey) throws TransactionException;

    /** * 清理所有的锁,该方法还没有调用点 */
    void cleanAllLocks() throws TransactionException;
}

LockManager的方法相对比较简单,看方法名就可以知道其作用了。

二、AbstractLockManager

seata提供了LockManager的抽象实现类AbstractLockManager,如果我们自定义一个锁管理器,可以通过对该抽象类扩展。
除了释放锁和锁查询之外,AbstractLockManager的其他关键方法在文章《Seata解析-TC处理全局事务和分支事务原理详解之全局事务开启和分支事务注册》已经做过了介绍,本文不再介绍。下面看一下释放锁和锁查询这两个方法:

public boolean releaseLock(BranchSession branchSession) throws TransactionException {
        if (branchSession == null) {
            throw new IllegalArgumentException("branchSession can't be null for memory/file locker.");
        }
        List<RowLock> locks = collectRowLocks(branchSession);
        try {
            return getLocker(branchSession).releaseLock(locks);
        } catch (Exception t) {
            LOGGER.error("unLock error, branchSession:{}", branchSession, t);
            return false;
        }
    }

    @Override
    public boolean isLockable(String xid, String resourceId, String lockKey) throws TransactionException {
        if (StringUtils.isBlank(lockKey)) {
            // no lock
            return true;
        }
        List<RowLock> locks = collectRowLocks(lockKey, resourceId, xid);
        try {
            return getLocker().isLockable(locks);
        } catch (Exception t) {
            LOGGER.error("isLockable error, xid:{} resourceId:{}, lockKey:{}", xid, resourceId, lockKey, t);
            return false;
        }
    }

这两个方法都是先调用collectRowLocks方法解析lockKey,lockKey的规则在之前的文章里面也已经介绍过。解析后调用getLocker(),由该方法返回的对象进行后续处理。
这个getLocker()方法是本类的抽象方法,由子类去实现。目前AbstractLockManager有三个子类,分别是RedisLockManager、DataBaseLockManager、FileLockManager,其中FileLockManager最简单。

三、FileLockManager

FileLockManager继承自AbstractLockManager。
FileLockManager非常简单,只有两个方法:

public Locker getLocker(BranchSession branchSession) {
        return new FileLocker(branchSession);
    }

    @Override
    public boolean releaseGlobalSessionLock(GlobalSession globalSession) throws TransactionException {
    	//遍历全局事务下的每个分支事务,找到分支事务加的锁,然后释放
    	//释放锁调用的也是FileLocker的方法
        ArrayList<BranchSession> branchSessions = globalSession.getBranchSessions();
        boolean releaseLockResult = true;
        for (BranchSession branchSession : branchSessions) {
            //调用父类的releaseLock
            if (!this.releaseLock(branchSession)) {
                releaseLockResult = false;
            }
        }
        return releaseLockResult;
    }

FileLockManager实现了父类的getLocker(),通过之前的代码介绍可以看到,几乎所有与锁相关的方法最后都需要调用getLocker()。
在FileLockManager中,getLocker返回对象FileLocker。下面介绍一下FileLocker类。

1、FileLocker

下面是FileLocker的构造方法,从构造方法可以看出,该类只处理BranchSession,换句话说该类只用于对数据库记录的主键加锁:

public FileLocker(BranchSession branchSession) {
        this.branchSession = branchSession;
    }

该类的一些方法之前也介绍过,这里只介绍它的一个属性LOCK_MAP:

private static final ConcurrentMap<String/* resourceId */, ConcurrentMap<String/* tableName */, ConcurrentMap<Integer/* bucketId */, BucketLockMap>>>
        LOCK_MAP = new ConcurrentHashMap<>();

LOCK_MAP是static的,该属性是一个全局属性。LOCK_MAP的作用是记录TC端所有的数据库记录锁,其存储的数据映射结构是:

数据库资源->表名->桶->BucketLockMap

数据库资源指的是数据库,对记录加锁时,seata会计算记录主键的哈希值,然后将其分散到128个桶中,BucketLockMap可以简单的理解为Map对象,是记录主键值与transactionId的对应关系,如果某个记录属于1号桶,那么就将该记录的主键值和加锁的transactionId添加到1号桶的BucketLockMap对象中。
每个分支事务注册的时候如果对记录加锁,那么就要在LOCK_MAP中增加一条锁记录。客户端查询某个记录是否加锁时,也是查看LOCK_MAP。
因为LOCK_MAP存放在内存中,所以FileLocker提供的其实是内存类型的锁。

四、DataBaseLockManager

DataBaseLockManager是将锁信息存储到了数据库中,因此需要首先在数据库中建一张记录锁的表,该表包含的字段如下:

xid, transaction_id, branch_id, resource_id, table_name, pk, row_key, gmt_create, gmt_modified

其他字段看名字就可以知道其含义,这里只介绍后面几个字段的含义:

  • pk:主键值
  • row_key:resource_id+table_name+pk的组合
  • gmt_create:当前记录的创建时间
  • gmt_modified:当前记录的最后修改时间

具体的建表语句可以在script模块的server/db目录下找到。
表名默认是lock_table,可以通过store.db.lockTable设置其他表名。
下面来看一下DataBaseLockManager的初始化方法:

//init方法在SPI创建对象后初始化时调用的,SPI会自动初始化实现Initialize接口的类
    public void init() {
        // init dataSource
        //datasourceType表示访问数据库,使用哪种连接池,seata提供了三种:dbcp,druid,hihari
        String datasourceType = ConfigurationFactory.getInstance().getConfig(ConfigurationKeys.STORE_DB_DATASOURCE_TYPE);
        //DataSourceProvider主要作用是创建出指定类型的DataSource对象
        DataSource lockStoreDataSource = EnhancedServiceLoader.load(DataSourceProvider.class, datasourceType).provide();
        //创建数据库锁对象
        locker = new DataBaseLocker(lockStoreDataSource);
    }

从上面的代码可以看出,DataBaseLockManager对记录加锁使用的是DataBaseLocker对象。
下面是DataBaseLocker的构造方法:

public DataBaseLocker(DataSource logStoreDataSource) {
        lockStore = new LockStoreDataBaseDAO(logStoreDataSource);
    }

在构造方法中又创建了LockStoreDataBaseDAO,其实这个LockStoreDataBaseDAO对象类似于FileLocker的LOCK_MAP的作用,DataBaseLocker存取锁到数据库都是通过LockStoreDataBaseDAO完成的,所以下面重点分析LockStoreDataBaseDAO。

public LockStoreDataBaseDAO(DataSource lockStoreDataSource) {
    	//数据源
        this.lockStoreDataSource = lockStoreDataSource;
        //存储锁的表名,默认是lock_table
        //可以通过store.db.lockTable设置表名
        lockTable = CONFIG.getConfig(ConfigurationKeys.LOCK_DB_TABLE, DEFAULT_LOCK_DB_TABLE);
        //dbType表示使用的是什么数据库,可以mysql,oracle等
        //通过store.db.dbType设置
        dbType = CONFIG.getConfig(ConfigurationKeys.STORE_DB_TYPE);
        if (StringUtils.isBlank(dbType)) {
            throw new StoreException("there must be db type.");
        }
        if (lockStoreDataSource == null) {
            throw new StoreException("there must be lockStoreDataSource.");
        }
    }

本文只分析一下加锁的流程。
加锁是通过acquireLock方法完成的。
首先将加锁信息的RowLock对象转化为LockDO对象,LockDO对象的属性与LOCK_TABLE的字段一一对应,转换之后根据LockDO对象的rowKey查询表LOCK_TABLE是否已经有加锁数据,如果没有加锁或者加锁的事务是当前事务,那么认为可以加锁,接下来将LockDO对象插入表LOCK_TABLE中,否则认为加锁失败,将刚才加过的锁解锁,然后返回加锁失败。下面看一下代码:

//convertToLockDO是将RowLock对象转化为LockDO对象
	protected List<LockDO> convertToLockDO(List<RowLock> locks) {
        List<LockDO> lockDOs = new ArrayList<>();
        if (CollectionUtils.isEmpty(locks)) {
            return lockDOs;
        }
        for (RowLock rowLock : locks) {
            LockDO lockDO = new LockDO();
            lockDO.setBranchId(rowLock.getBranchId());
            lockDO.setPk(rowLock.getPk());
            lockDO.setResourceId(rowLock.getResourceId());
            lockDO.setRowKey(getRowKey(rowLock.getResourceId(), rowLock.getTableName(), rowLock.getPk()));
            lockDO.setXid(rowLock.getXid());
            lockDO.setTransactionId(rowLock.getTransactionId());
            lockDO.setTableName(rowLock.getTableName());
            lockDOs.add(lockDO);
        }
        return lockDOs;
    }
    public boolean acquireLock(List<LockDO> lockDOs) {
        Connection conn = null;
        PreparedStatement ps = null;
        ResultSet rs = null;
        Set<String> dbExistedRowKeys = new HashSet<>();
        boolean originalAutoCommit = true;
        if (lockDOs.size() > 1) {
            //过滤掉重复的加锁记录
            lockDOs = lockDOs.stream().filter(LambdaUtils.distinctByKey(LockDO::getRowKey)).collect(Collectors.toList());
        }
        try {
            //获得连接
            conn = lockStoreDataSource.getConnection();
            if (originalAutoCommit = conn.getAutoCommit()) {
                conn.setAutoCommit(false);
            }
            //check lock
            //使用StringJoiner对象构建SQL语句的部分内容,分隔符是“,”
            StringJoiner sj = new StringJoiner(",");
            for (int i = 0; i < lockDOs.size(); i++) {
                sj.add("?");
            }
            boolean canLock = true;
            //query
            //构建检查当前记录是否已经加锁的SQL语句
            //checkLockSQL=select xid, transaction_id, branch_id, resource_id, table_name, pk, row_key, gmt_create, gmt_modified
            // from lock_table where row_key in ('?','?','?')
            String checkLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getCheckLockableSql(lockTable, sj.toString());
            ps = conn.prepareStatement(checkLockSQL);
            for (int i = 0; i < lockDOs.size(); i++) {
                ps.setString(i + 1, lockDOs.get(i).getRowKey());
            }
            //执行上面的checkLockSQL语句
            rs = ps.executeQuery();
            String currentXID = lockDOs.get(0).getXid();
            //遍历数据库的查询结果,将当前事务的XID与查询结果中每条记录的XID进行比较,
            //查询结果中每条记录的XID表示之前已经有事务XID对该条记录加过锁了
            //如果记录已经被其他事务加锁,则设置canLock=false
            while (rs.next()) {
                String dbXID = rs.getString(ServerTableColumnsName.LOCK_TABLE_XID);
                if (!StringUtils.equals(dbXID, currentXID)) {
                    if (LOGGER.isInfoEnabled()) {
                        String dbPk = rs.getString(ServerTableColumnsName.LOCK_TABLE_PK);
                        String dbTableName = rs.getString(ServerTableColumnsName.LOCK_TABLE_TABLE_NAME);
                        Long dbBranchId = rs.getLong(ServerTableColumnsName.LOCK_TABLE_BRANCH_ID);
                        LOGGER.info("Global lock on [{}:{}] is holding by xid {} branchId {}", dbTableName, dbPk, dbXID,
                            dbBranchId);
                    }
                    canLock &= false;
                    break;
                }
                //记录所有已经加过锁的row_key
                dbExistedRowKeys.add(rs.getString(ServerTableColumnsName.LOCK_TABLE_ROW_KEY));
            }

            if (!canLock) {
                //如果有记录已经加过锁,则回滚退出
                conn.rollback();
                return false;
            }
            List<LockDO> unrepeatedLockDOs = null;
            if (CollectionUtils.isNotEmpty(dbExistedRowKeys)) {
                unrepeatedLockDOs = lockDOs.stream().filter(lockDO -> !dbExistedRowKeys.contains(lockDO.getRowKey()))
                    .collect(Collectors.toList());
            } else {
                unrepeatedLockDOs = lockDOs;
            }
            if (CollectionUtils.isEmpty(unrepeatedLockDOs)) {
                //表示都已经加过锁,无需再加锁
                conn.rollback();
                return true;
            }
            //lock
            //将LockDO对象插入到数据库中
            if (unrepeatedLockDOs.size() == 1) {
                LockDO lockDO = unrepeatedLockDOs.get(0);
                if (!doAcquireLock(conn, lockDO)) {
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("Global lock acquire failed, xid {} branchId {} pk {}", lockDO.getXid(), lockDO.getBranchId(), lockDO.getPk());
                    }
                    conn.rollback();
                    return false;
                }
            } else {
                if (!doAcquireLocks(conn, unrepeatedLockDOs)) {
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("Global lock batch acquire failed, xid {} branchId {} pks {}", unrepeatedLockDOs.get(0).getXid(),
                            unrepeatedLockDOs.get(0).getBranchId(), unrepeatedLockDOs.stream().map(lockDO -> lockDO.getPk()).collect(Collectors.toList()));
                    }
                    conn.rollback();
                    return false;
                }
            }
            //数据库提交
            conn.commit();
            return true;
        } catch (SQLException e) {
            throw new StoreException(e);
        } finally {
            IOUtil.close(rs, ps);
            if (conn != null) {
                try {
                    if (originalAutoCommit) {
                        conn.setAutoCommit(true);
                    }
                    conn.close();
                } catch (SQLException e) {
                }
            }
        }
    }

五、RedisLockManager

RedisLockManager的整体的结构与之前介绍DataBaseLockManager比较类似。
本小节只说明其加锁的流程。
首先也是将RowLock对象转化为LockDO,然后将“SEATA_LOCK_”与每个LockDO对象的rowLock属性拼接在一起作为redis的key,查看是否redis中是否已经有了该key,如果没有说明没有加锁,则将LockDO对象转化为JSON格式作为value连同上面的key一起存入redis。如果有key说明已经加锁了,则返回加锁失败。

相关文章