分享

Spring Boot+MyBatis+Atomikos+MySQL(附源码)

 田维常 2023-05-14 发布于广东

我们在实际项目中,尽量规避分布式事务。但是,有些时候是真的需要做一些服务拆分从而会引出分布式事务问题。

同时,分布式事务也是面试中市场被问,可以拿着这个案例练练手,面试就可以说上个123了。

这里举个业务板栗:用户领取优惠券,需要扣减用户领取次数,然后记录一个用户领取优惠券记录。

拆分前
拆分后

原本这里可以使用消息队列方式,采用异步化去新增用户领取记录。但是,这里需求是就是需要用户领完立马就能查看到自己的领取记录,那我们这里就引入了Atomikos来实现分布式事务问题。

分布式事务

分布式事务是指跨越多个计算机或数据库的事务,这些计算机或数据库之间可能存在网络延迟、故障或不一致性的情况。分布式事务需要保证所有操作的原子性、一致性、隔离性和持久性,以确保数据的正确性和完整性。

分布式事务协议有哪些?

分布式事务协议主要有两种:2PC(Two-Phase Commit)和3PC(Three-Phase Commit)。

2PC是目前最常用的分布式事务协议,其流程分为两个阶段:准备阶段和提交阶段。在准备阶段,事务协调者向所有参与者发出准备请求,参与者将本地事务执行到prepare状态,并将prepare结果返回给事务协调者。在提交阶段,如果所有参与者都执行成功,则事务协调者向所有参与者发出提交请求,参与者将本地事务提交,否则事务协调者向所有参与者发出回滚请求,参与者将本地事务回滚。

3PC是2PC的改进版,其在2PC的基础上增加了一个准备提交阶段。在准备提交阶段,协调者向参与者询问是否可以提交,如果参与者返回同意,则在提交阶段直接提交,否则在提交阶段回滚。

分布式事务常见解决方案有哪些?

分布式事务解决实现方案有:

  • 基于消息队列的分布式事务方案(如RocketMQ的开源方案)
  • 基于分布式事务框架的分布式事务方案(如Seata、TCC-Transaction等框架)
  • 基于XA协议的分布式事务方案(如JTA等)
  • 基于可靠消息最终一致性的分布式事务方案(如阿里巴巴的分布式事务中间件GTS)
  • 基于CAP原理的分布式事务方案(如CQRS架构中的事件溯源模式)

什么是JTA ?

JTA(Java Transaction API),是J2EE的编程接口规范,它是XA协议的JAVA实现。它主要定义了:

一个事务管理器的接口javax.transaction.TransactionManager,定义了有关事务的开始、提交、撤回等>操作。

一个满足XA规范的资源定义接口javax.transaction.xa.XAResource,一种资源如果要支持JTA事务,就需要让它的资源实现该XAResource接口,并实现该接口定义的两阶段提交相关的接口。 如果我们有一个应用,它使用JTA接口实现事务,应用在运行的时候,就需要一个实现JTA的容器,一般情况下,这是一个J2EE容器,像JBoss,Websphere等应用服务器。

但是,也有一些独立的框架实现了JTA,例如Atomikos, bitronix都提供了jar包方式的JTA实现框架。这样我们就能够在Tomcat或者Jetty之类的服务器上运行使用JTA实现事务的应用系统。

在上面的本地事务和外部事务的区别中说到,JTA事务是外部事务,可以用来实现对多个资源的事务性。它正是通过每个资源实现的XAResource来进行两阶段提交的控制。感兴趣的同学可以看看这个接口的方法,除了commit, rollback等方法以外,还有end(), forget(), isSameRM(), prepare()等等。光从这些接口就能够想象JTA在实现两阶段事务的复杂性。

什么是XA?

XA是由X/Open组织提出的分布式事务的架构(或者叫协议)。XA架构主要定义了(全局)事务管理器(Transaction Manager)和(局部)资源管理器(Resource Manager)之间的接口。XA接口是双向的系统接口,在事务管理器(Transaction Manager)以及一个或多个资源管理器(Resource Manager)之间形成通信桥梁。也就是说,在基于XA的一个事务中,我们可以针对多个资源进行事务管理,例如一个系统访问多个数据库,或即访问数据库、又访问像消息中间件这样的资源。这样我们就能够实现在多个数据库和消息中间件直接实现全部提交、或全部取消的事务。XA规范不是java的规范,而是一种通用的规范, 目前各种数据库、以及很多消息中间件都支持XA规范。

JTA是满足XA规范的、用于Java开发的规范。所以,当我们说,使用JTA实现分布式事务的时候,其实就是说,使用JTA规范,实现系统内多个数据库、消息中间件等资源的事务。

什么是Atomikos

Atomikos是一个非常流行的开源事务管理器,并且可以嵌入到你的Spring Boot应用中。Tomcat应用服务器没有实现JTA规范,当使用Tomcat作为应用服务器的时候,需要使用第三方的事务管理器类来作为全局的事务管理器,而Atomikos框架就是这个作用,将事务管理整合到应用中,而不依赖于application server。

Spring Boot 集成Atomikos

说一堆的理论没什么用,show me the code。

技术栈:Spring Boot+MyBatis+Atomikos+MySQL

如果你按照本文代码,注意你的mysql版本。

首先建好两个数据库(my-db_0和my-db_1),然后每个库里各建一张表。

数据库my-db_0中:

CREATE TABLE `t_user_0` (
`id` bigint NOT NULL AUTO_INCREMENT,
`user_name` varchar(32CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`age` int NOT NULL,
`gender` int NOT NULL,
PRIMARY KEY (`id`)
ENGINE=InnoDB AUTO_INCREMENT=21 DEFAULT CHARSET=utf8;

数据库my-db_1中:

CREATE TABLE `t_user_1` (
`id` bigint NOT NULL AUTO_INCREMENT,
`user_name` varchar(32CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`age` int NOT NULL,
`gender` int NOT NULL,
PRIMARY KEY (`id`)
ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8;

这里只是为了演示分布式事务,不用在意表的具体含义。

整体项目结构

项目整体结构

maven配置

<project xmlns="http://maven./POM/4.0.0" xmlns:xsi="http://www./2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven./POM/4.0.0 http://maven./xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>com.tian</groupId>
    <artifactId>spring-boot-atomikos</artifactId>
    <version>1.0-SNAPSHOT</version>

    <packaging>jar</packaging>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
    </parent>

    <name>spring-boot-atomikos</name>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- mybatis依赖 -->
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>1.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- mysql依赖 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.16</version>
        </dependency>
        <!--分布式事务-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jta-atomikos</artifactId>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <!-- 要使生成的jar可运行,需要加入此插件 -->
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
        <resources>
            <resource>
                <directory>src/main/java</directory>
                <excludes>
                    <exclude>**/*.java</exclude>
                </excludes>
            </resource>
            <resource>
                <!-- 编译xml文件 -->
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*.*</include>
                </includes>
            </resource>
        </resources>
    </build>
</project>

properties配置

server.port=9001
spring.application.name=atomikos-demo

spring.datasource.user0.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.user0.url=jdbc:mysql://localhost:3306/my-db_0?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=true
spring.datasource.user0.user=root
spring.datasource.user0.password=123456

spring.datasource.user1.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.user1.url=jdbc:mysql://localhost:3306/my-db_1?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=true
spring.datasource.user1.user=root
spring.datasource.user1.password=123456

mybatis.mapperLocations=classpath:/com/tian/mapper/*/*.xml
mybatis.typeAliasesPackage=com.tian.entity
mybatis.configuration.cache-enabled=true

数据源

/**
 * @author tianwc  公众号:java后端技术全栈、面试专栏
 * @version 1.0.0
 * @date 2023年05月11日 19:38
 * 博客地址:<a href="http:///">博客地址</a>
 * <p>
 * 配置好两个数据源
 */

@Configuration
public class DataSourceConfig {

    // 将这个对象放入spring容器中(交给Spring管理)
    @Bean
    // 读取 application.yml 中的配置参数映射成为一个对象
    @ConfigurationProperties(prefix = "spring.datasource.user0")
    public XADataSource getDataSource0() {
        // 创建XA连接池
        return new MysqlXADataSource();
    }

    /**
     * 创建Atomikos数据源
     * 注解@DependsOn("druidXADataSourcePre"),在名为druidXADataSourcePre的bean实例化后加载当前bean
     */

    @Bean
    @DependsOn("getDataSource0")
    @Primary
    public DataSource dataSourcePre(@Qualifier("getDataSource0") XADataSource xaDataSource) {
        //这里的AtomikosDataSourceBean使用的是spring提供的
        AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
        atomikosDataSourceBean.setXaDataSource(xaDataSource);
        atomikosDataSourceBean.setMaxPoolSize(20);
        return atomikosDataSourceBean;
    }


    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.user1")
    public XADataSource getDataSource1() {
        // 创建XA连接池
        return new MysqlXADataSource();
    }

    @Bean
    @DependsOn("getDataSource1")
    public DataSource dataSourceSit(@Qualifier("getDataSource1") XADataSource xaDataSource) {
        //这里的AtomikosDataSourceBean使用的是spring提供的
        AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
        atomikosDataSourceBean.setXaDataSource(xaDataSource);
        return atomikosDataSourceBean;
    }
}

MyBatis扫描

@Configuration
@MapperScan(basePackages = {"com.tian.mapper.user0"}, sqlSessionTemplateRef = "preSqlSessionTemplate")
public class MybatisPreConfig {

    @Autowired
    @Qualifier("dataSourcePre")
    private DataSource dataSource;

    /**
     * 创建 SqlSessionFactory
     */

    @Bean
    @Primary
    public SqlSessionFactory preSqlSessionFactory() throws Exception{
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dataSource);
        bean.setMapperLocations(new PathMatchingResourcePatternResolver().
                getResources("classpath*:com/tian/mapper/user0/*.xml"));
        return bean.getObject();
    }

    /**
     * 通过 SqlSessionFactory 来创建 SqlSessionTemplate
     */

    @Bean
    @Primary
    public SqlSessionTemplate preSqlSessionTemplate(@Qualifier("preSqlSessionFactory") SqlSessionFactory sqlSessionFactory){
        // SqlSessionTemplate是线程安全的,可以被多个DAO所共享使用
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}

另外一个基本一样,就是扫描路径改成:

("classpath*:com/tian/mapper/user1/*.xml")

mapper.xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-////DTD Mapper 3.0//EN" "http:///dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.tian.mapper.user0.User0Mapper">

    <!-- -->
    <cache eviction="LRU" flushInterval="10000" size="1024"  />

    <resultMap id="BaseResultMap" type="com.tian.entity.User0">
        <id column="id" jdbcType="BIGINT" property="id" />
        <result column="user_name" jdbcType="VARCHAR" property="userName" />
        <result column="age" jdbcType="INTEGER" property="age" />
        <result column="gender" jdbcType="INTEGER" property="gender" />
    </resultMap>

    <sql id="Base_Column_List">
        id, user_name, age, gender
    </sql>
    <insert id="insert" parameterType="com.tian.entity.User0">
        insert into t_user_0 (id, user_name,age, gender)
        values (#{id,jdbcType=BIGINT}, #{userName,jdbcType=VARCHAR},#{age,jdbcType=INTEGER},#{gender,jdbcType=INTEGER})
    </insert>

</mapper>

另外一个基本完全一样,这里就贴出来了。

对应mapper接口 也非常简单,贴出一个:

public interface User0Mapper {

    int insert(User0 record);
}

service

/**
 * @author tianwc  公众号:java后端技术全栈、面试专栏
 * @version 1.0.0
 * @date 2023年05月11日 19:38
 * 博客地址:<a href="http:///">博客地址</a>
 * <p>
 * 模拟三种场景:正常、制造异常、数据库异常
 */

@Service
public class UserServiceImpl implements UserService {

    @Resource
    private User0Mapper user0Mapper;
    @Resource
    private User1Mapper user1Mapper;
    /**
     * 正常逻辑 同时对两个数据库进行 插入数据
     */

    @Transactional
    @Override
    public int transaction1() throws Exception {
        User1 user1 = new User1();
        user1.setUserName("22222");
        user1.setAge(11);
        user1.setGender(0);
        user1Mapper.add(user1);
        System.out.println("---------------------------");
        // sit(数据源1)
        User0 user0 = new User0();
        user0.setUserName("111111");
        user0.setAge(11);
        user0.setGender(0);
        user0Mapper.insert(user0);
        return 1;
    }
    /**
     * 正常逻辑 同时对两个数据库进行 插入数据
     * 数据插入完后  出现异常
     */

    @Transactional
    @Override
    public int transaction2() throws Exception {
        User1 user1 = new User1();
        user1.setUserName("22222");
        user1.setAge(11);
        user1.setGender(0);
        user1Mapper.add(user1);
        System.out.println("---------------------------");
        // sit(数据源1)
        User0 user0 = new User0();
        user0.setUserName("111111");
        user0.setAge(11);
        user0.setGender(0);
        user0Mapper.insert(user0);
        //认为制造一个异常
        int a=1/0;
        return 1;
    }

    /**
     * 第一个数据插入成功  第二个数据插入失败
     */

    @Transactional
    @Override
    public int transaction3() throws Exception {
        User1 user1 = new User1();
        user1.setUserName("22222");
        user1.setAge(11);
        user1.setGender(0);
        user1Mapper.add(user1);
        System.out.println("---------------------------");
        // sit(数据源1)
        User0 user0 = new User0();
       //故意搞长点,模拟插入失败 让前面的数据回滚 user0.setUserName("111110000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001");
        user0.setAge(11);
        user0.setGender(0);
        user0Mapper.insert(user0);
        return 1;
    }
}

controller

@RestController
@RequestMapping("/user")
public class UserController {

    @Resource
    private UserService userService;

    @PostMapping("/test1")
    public CommonResult test1() {
        int i = 0;
        try {
            i = userService.transaction1();
            return CommonResult.success(i);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return CommonResult.success(i);
    }

    @PostMapping("/test2")
    public CommonResult test2() {
        int i = 0;
        try {
            i = userService.transaction2();
            return CommonResult.success(i);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return CommonResult.success(i);
    }

    @PostMapping("/test3")
    public CommonResult test3() {
        int i = 0;
        try {
            i = userService.transaction3();
            return CommonResult.success(i);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return CommonResult.success(i);
    }
}

项目启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;

/**
 * @author tianwc  公众号:java后端技术全栈、面试专栏
 * @version 1.0.0
 * @date 2023年05月11日 19:38
 * 博客地址:<a href="http:///">博客地址</a>
 * <p>
 * 项目启动类
 */

@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
//@ComponentScan(basePackages 
= {"com.tian"})
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.classargs);
    }
}

测试

启动项目,分别测试下面三个:

http://localhost:9001/user/test1   结果:两个数据库中,表数据都新增一条

http://localhost:9001/user/test2    结果:抛出除数不能为Zero的异常,两个数据库都没有新增数据。

http://localhost:9001/user/test3    结果:抛出数据字段值太长异常,两个数据库都没有新增数据。

好了,到此我们已经实现了分布式事务。

    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多