spring使用mybatis集成Atomikos管理多数据源的分布式事务

x33g5p2x  于2021-10-14 转载在 Spring  
字(12.0k)|赞(0)|评价(0)|浏览(448)

spring使用mybatis集成Atomikos

开发原因

在Java后端开发过程中事务控制非常重要,而Spring为我们提供了方便的声明式事务方法@transactional。但是默认的Spring事务只支持单数据源,而实际上一个系统往往需要写多个数据源,这个时候我们就需要考虑如何通过Spring实现对分布式事务的支持。

开发思路

对于数据库层面的分布式事务而言,JTA(Java Transaction API,XA的JAVA实现方案)是一个不错的解决方案,通常JTA需要应用服务器的支持,但在查阅SpringBoot的文档时发现,它推荐了Atomikos和 Bitronix两种无需服务器支持的分布式事务组件,在这两个组件中,Atomikos更受大家的好评,所以我选择使用Atomikos

Atomikos实践

项目结构

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lZ8Fser1-1634197598578)(SpringBoot集成Atomikos 实现分布式事务管理多数据源.assets/image-20211014152158297.png)]

sql脚本

mybatis_test库

CREATE TABLE `log_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `create_time` datetime DEFAULT NULL,
  `content` longtext,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8;

redis_test库

CREATE TABLE `order_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `money` double NOT NULL,
  `user_id` varchar(20) DEFAULT NULL,
  `address` varchar(200) DEFAULT NULL,
  `create_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;

配置文件 application.yml

spring:
  datasource:
    type: com.alibaba.druid.pool.xa.DruidXADataSource
    druid:
      master:
        driver-class-name: com.mysql.jdbc.Driver
        url: jdbc:mysql://127.0.0.1:3306/mybatis_test?autoReconnect=true&useSSL=false&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull
        username: root
        password: root
        maxPoolSize: 5
      slave:
        driver-class-name: com.mysql.jdbc.Driver
        url: jdbc:mysql://127.0.0.1:3306/redis_test?autoReconnect=true&useSSL=false&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull
        username: root
        password: root
        maxPoolSize: 5

pom文件

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.2.6</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jta-atomikos</artifactId>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>

        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.1.3</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

配置类

这里由于配置了俩个数据源,所有每一个数据源对应一个SqlSessionFactoryBean;并且管理各自的mapper,然后配置全局的事务管理器,并将事务管理器纳入到spring容器当中
这里同一个数据库的mapper放入一个包里面

package com.mye.hl15springatomikos.config;
 

import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource;
import lombok.Data;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
 
import javax.sql.DataSource;
import java.sql.SQLException;

@Data   //必须要加,不然获取不到连接数据库的url,username,password会出现空指针异常
@Configuration  //说明是一个配置类
@EnableConfigurationProperties(MybatisDatasourceConfig.class)   //读取application.yml配置文件的信息
@ConfigurationProperties(prefix = "spring.datasource.druid.master")//这个是为了解决@ConfigurationProperties出现红色波浪线
@MapperScan(basePackages = "com.mye.hl15springatomikos.mapper.log",//扫描对应的包
        sqlSessionFactoryRef = "firstSqlSessionFactory",
        sqlSessionTemplateRef = "firstSqlSessionTemplate")
public class MybatisDatasourceConfig {

    private String url;
    private String username;
    private String password;
 
    @Primary
    @Bean(name = "firstDataSource")
    public DataSource firstDataSource() throws SQLException {
        //获取mysqlXaDataSource
        MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
        //数据源配置
        mysqlXaDataSource.setUrl(url);
        mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
        mysqlXaDataSource.setPassword(password);
        mysqlXaDataSource.setUser(username);
        mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
        //将mysql数据源交由Atomikos统一管理
        AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
        xaDataSource.setXaDataSource(mysqlXaDataSource);
        xaDataSource.setUniqueResourceName("firstDataSource");
        return xaDataSource;
    }
 
    /** * 设置SqlSessionFactory,每一个数据源一个SqlSessionFactoryBean * * @param dataSource * @return * @throws Exception */
    @Bean(name = "firstSqlSessionFactory")
    public SqlSessionFactory firstSqlSessionFactory(@Qualifier("firstDataSource") DataSource dataSource) throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dataSource);
// bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:/mapper/order/*Mapper.xml"));
        return bean.getObject();
    }
 
    @Bean(name = "firstSqlSessionTemplate")
    public SqlSessionTemplate testSqlSessionTemplate(
            @Qualifier("firstSqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}
package com.mye.hl15springatomikos.config;
 

import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource;
import lombok.Data;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
import javax.sql.DataSource;
import java.sql.SQLException;

@Data   //必须要加,不然获取不到连接数据库的url,username,password会出现空指针异常
@Configuration   //说明是一个配置类
@ConfigurationProperties(prefix = "spring.datasource.druid.slave")//读取application.yml配置文件的信息
@EnableConfigurationProperties(MybatisDatasourceConfig2.class)    //这个是为了解决@ConfigurationProperties出现红色波浪线
@MapperScan(basePackages = "com.mye.hl15springatomikos.mapper.order", //扫描对应的包
        sqlSessionFactoryRef = "secondSqlSessionFactory",
        sqlSessionTemplateRef = "secondSqlSessionTemplate"
)
public class MybatisDatasourceConfig2 {

    private String url;
    private String username;
    private String password;
 
    @Bean(name = "secondDataSource")
    public DataSource secondDataSource() throws SQLException {
        //获取mysqlXaDataSource
        MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
        //配置数据源
        mysqlXaDataSource.setUrl(url);
        mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
        mysqlXaDataSource.setPassword(password);
        mysqlXaDataSource.setUser(username);
        mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
        //将mysql数据源交由Atomikos统一管理
        AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
        xaDataSource.setXaDataSource(mysqlXaDataSource);
        xaDataSource.setUniqueResourceName("secondDataSource");
        return xaDataSource;
    }
 
 
    /** * 设置SqlSessionFactory,每一个数据源一个SqlSessionFactoryBean * * @param dataSource * @return * @throws Exception */
    @Bean(name = "secondSqlSessionFactory")
    public SqlSessionFactory secondSqlSessionFactory(@Qualifier("secondDataSource") DataSource dataSource) throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dataSource);
// bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:/mapper/log/*Mapper.xml"));
        return bean.getObject();
    }
 
    @Bean(name = "secondSqlSessionTemplate")
    public SqlSessionTemplate testSqlSessionTemplate(
            @Qualifier("secondSqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}

实体类

LogInfo.java

package com.mye.hl15springatomikos.pojo;
 
import lombok.Data;
 
import java.util.Date;

@Data
public class LogInfo {
 
    private Integer id;
 
    private Date createTime;
 
    private String content;
}

OrderInfo.java

package com.mye.hl15springatomikos.pojo;
 
import lombok.Data;
 
import java.util.Date;

@Data
public class OrderInfo {
 
    private Integer id;
 
    private Double money;
 
    private String userId;
 
    private String address;
 
    private Date createTime;
 
}

mapper层

LogMapper.java

package com.mye.hl15springatomikos.mapper.log;
 
import com.mye.hl15springatomikos.pojo.LogInfo;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Options;

@Repository
public interface LogMapper {

    /** * 新增 */
    @Insert("INSERT INTO log_info(create_time,content) VALUES(#{createTime}, #{content})")
    @Options(useGeneratedKeys = true, keyProperty = "id",keyColumn = "id")
    int insertLogInfo(LogInfo logInfo);

}

OrderMapper.java

package com.mye.hl15springatomikos.mapper.order;
 
import com.mye.hl15springatomikos.pojo.OrderInfo;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Options;
 
@Repository
public interface OrderMapper {
 
    /** * 新增 */
    @Insert("INSERT INTO order_info (money,user_id,address,create_time) VALUES(#{money},#{userId},#{address},#{createTime})")
    @Options(useGeneratedKeys = true,keyProperty = "id",keyColumn = "id")
    int insertOrderInfo(OrderInfo orderInfo);

}

service层

OrderService.java

public interface OrderService {
    /** * 新增 */
    void addOrder(OrderInfo orderInfo);
}

OrderServiceImpl.java

package com.mye.hl15springatomikos.service.impl;
 
 

import com.mye.hl15springatomikos.mapper.log.LogMapper;
import com.mye.hl15springatomikos.mapper.order.OrderMapper;
import com.mye.hl15springatomikos.pojo.LogInfo;
import com.mye.hl15springatomikos.pojo.OrderInfo;
import com.mye.hl15springatomikos.service.OrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.transaction.Transactional;
import java.util.Date;
import java.util.Random;

@Slf4j
@Service
@Transactional(rollbackOn = Exception.class)
public class OrderServiceImpl implements OrderService {
 
    @Autowired
    OrderMapper orderMapper;
 
    @Autowired
    LogMapper logMapper;

    @Override
    public void addOrder(OrderInfo orderInfo) {
        int insert = orderMapper.insertOrderInfo(orderInfo);
        log.info("order库新增sql执行条数:{}",insert);
 
        //测试1 异常回滚
// int i=1/0;
 
        LogInfo logInfo=new LogInfo();
        logInfo.setId((new Random().nextInt()));
        logInfo.setCreateTime(new Date());
        logInfo.setContent(orderInfo.toString());
        int insert1 = logMapper.insertLogInfo(logInfo);
        log.info("logs库新增sql执行条数:{}",insert1);
 
        //测试2 异常回滚
// int i=1/0;
    }
}

启动类

@SpringBootApplication()
public class Hl15SpringAtomikosApplication {

    public static void main(String[] args) {
        SpringApplication.run(Hl15SpringAtomikosApplication.class, args);
    }

}

测试类

package com.mye.hl15springatomikos;

import com.mye.hl15springatomikos.pojo.OrderInfo;
import com.mye.hl15springatomikos.service.OrderService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Date;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Hl15SpringAtomikosApplication.class)
public class OrderServiceTest {

    @Autowired
    private OrderService orderService;

    @Test
    public void test01(){
        OrderInfo orderInfo = new OrderInfo();
        orderInfo.setMoney(1.1d);
        orderInfo.setUserId("1");
        orderInfo.setCreateTime(new Date());
        orderInfo.setAddress("wuhan");
        orderService.addOrder(orderInfo);

    }
}

相关文章