我们在实际项目中,尽量规避分布式事务。但是,有些时候是真的需要做一些服务拆分从而会引出分布式事务问题。
同时,分布式事务也是面试中市场被问,可以拿着这个案例练练手,面试就可以说上个123了。
这里举个业务板栗:用户领取优惠券,需要扣减用户领取次数,然后记录一个用户领取优惠券记录。
拆分前拆分后原本这里可以使用消息队列方式,采用异步化去新增用户领取记录。但是,这里需求是就是需要用户领完立马就能查看到自己的领取记录,那我们这里就引入了Atomikos
来实现分布式事务问题。
分布式事务
分布式事务是指跨越多个计算机或数据库的事务,这些计算机或数据库之间可能存在网络延迟、故障或不一致性的情况。分布式事务需要保证所有操作的原子性、一致性、隔离性和持久性,以确保数据的正确性和完整性。
分布式事务协议有哪些?
分布式事务协议主要有两种:2PC(Two-Phase Commit)和3PC(Three-Phase Commit)。
2PC是目前最常用的分布式事务协议,其流程分为两个阶段:准备阶段和提交阶段。在准备阶段,事务协调者向所有参与者发出准备请求,参与者将本地事务执行到prepare状态,并将prepare结果返回给事务协调者。在提交阶段,如果所有参与者都执行成功,则事务协调者向所有参与者发出提交请求,参与者将本地事务提交,否则事务协调者向所有参与者发出回滚请求,参与者将本地事务回滚。
3PC是2PC的改进版,其在2PC的基础上增加了一个准备提交阶段。在准备提交阶段,协调者向参与者询问是否可以提交,如果参与者返回同意,则在提交阶段直接提交,否则在提交阶段回滚。
分布式事务常见解决方案有哪些?
分布式事务解决实现方案有:
- 基于消息队列的分布式事务方案(如RocketMQ的开源方案)
- 基于分布式事务框架的分布式事务方案(如Seata、TCC-Transaction等框架)
- 基于可靠消息最终一致性的分布式事务方案(如阿里巴巴的分布式事务中间件GTS)
- 基于CAP原理的分布式事务方案(如CQRS架构中的事件溯源模式)
什么是JTA ?
JTA(Java Transaction API),是J2EE的编程接口规范,它是XA协议的JAVA实现。它主要定义了:
一个事务管理器的接口javax.transaction.TransactionManager
,定义了有关事务的开始、提交、撤回等>操作。
一个满足XA规范的资源定义接口javax.transaction.xa.XAResource
,一种资源如果要支持JTA事务,就需要让它的资源实现该XAResource
接口,并实现该接口定义的两阶段提交相关的接口。
如果我们有一个应用,它使用JTA接口实现事务,应用在运行的时候,就需要一个实现JTA的容器,一般情况下,这是一个J2EE容器,像JBoss,Websphere等应用服务器。
但是,也有一些独立的框架实现了JTA,例如Atomikos, bitronix都提供了jar包方式的JTA实现框架。这样我们就能够在Tomcat或者Jetty之类的服务器上运行使用JTA实现事务的应用系统。
在上面的本地事务和外部事务的区别中说到,JTA事务是外部事务,可以用来实现对多个资源的事务性。它正是通过每个资源实现的XAResource
来进行两阶段提交的控制。感兴趣的同学可以看看这个接口的方法,除了commit, rollback等方法以外,还有end()
, forget()
, isSameRM()
, prepare()
等等。光从这些接口就能够想象JTA在实现两阶段事务的复杂性。
什么是XA?
XA是由X/Open组织提出的分布式事务的架构(或者叫协议)。XA架构主要定义了(全局)事务管理器(Transaction Manager)和(局部)资源管理器(Resource Manager)之间的接口。XA接口是双向的系统接口,在事务管理器(Transaction Manager)以及一个或多个资源管理器(Resource Manager)之间形成通信桥梁。也就是说,在基于XA的一个事务中,我们可以针对多个资源进行事务管理,例如一个系统访问多个数据库,或即访问数据库、又访问像消息中间件这样的资源。这样我们就能够实现在多个数据库和消息中间件直接实现全部提交、或全部取消的事务。XA规范不是java的规范,而是一种通用的规范, 目前各种数据库、以及很多消息中间件都支持XA规范。
JTA是满足XA规范的、用于Java开发的规范。所以,当我们说,使用JTA实现分布式事务的时候,其实就是说,使用JTA规范,实现系统内多个数据库、消息中间件等资源的事务。
什么是Atomikos
Atomikos是一个非常流行的开源事务管理器,并且可以嵌入到你的Spring Boot应用中。Tomcat应用服务器没有实现JTA规范,当使用Tomcat作为应用服务器的时候,需要使用第三方的事务管理器类来作为全局的事务管理器,而Atomikos框架就是这个作用,将事务管理整合到应用中,而不依赖于application server。
Spring Boot 集成Atomikos
说一堆的理论没什么用,show me the code。
技术栈:Spring Boot+MyBatis+Atomikos+MySQL
如果你按照本文代码,注意你的mysql版本。
首先建好两个数据库(my-db_0和my-db_1),然后每个库里各建一张表。
数据库my-db_0中:
CREATE TABLE `t_user_0` (
`id` bigint NOT NULL AUTO_INCREMENT,
`user_name` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`age` int NOT NULL,
`gender` int NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=21 DEFAULT CHARSET=utf8;
数据库my-db_1中:
CREATE TABLE `t_user_1` (
`id` bigint NOT NULL AUTO_INCREMENT,
`user_name` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`age` int NOT NULL,
`gender` int NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8;
这里只是为了演示分布式事务,不用在意表的具体含义。
整体项目结构
项目整体结构maven配置
<project xmlns="http://maven./POM/4.0.0" xmlns:xsi="http://www./2001/XMLSchema-instance"
xsi:schemaLocation="http://maven./POM/4.0.0 http://maven./xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tian</groupId>
<artifactId>spring-boot-atomikos</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
</parent>
<name>spring-boot-atomikos</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- mybatis依赖 -->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- mysql依赖 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.16</version>
</dependency>
<!--分布式事务-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 要使生成的jar可运行,需要加入此插件 -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
<resources>
<resource>
<directory>src/main/java</directory>
<excludes>
<exclude>**/*.java</exclude>
</excludes>
</resource>
<resource>
<!-- 编译xml文件 -->
<directory>src/main/resources</directory>
<includes>
<include>**/*.*</include>
</includes>
</resource>
</resources>
</build>
</project>
properties配置
server.port=9001
spring.application.name=atomikos-demo
spring.datasource.user0.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.user0.url=jdbc:mysql://localhost:3306/my-db_0?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=true
spring.datasource.user0.user=root
spring.datasource.user0.password=123456
spring.datasource.user1.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.user1.url=jdbc:mysql://localhost:3306/my-db_1?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=true
spring.datasource.user1.user=root
spring.datasource.user1.password=123456
mybatis.mapperLocations=classpath:/com/tian/mapper/*/*.xml
mybatis.typeAliasesPackage=com.tian.entity
mybatis.configuration.cache-enabled=true
数据源
/**
* @author tianwc 公众号:java后端技术全栈、面试专栏
* @version 1.0.0
* @date 2023年05月11日 19:38
* 博客地址:<a href="http:///">博客地址</a>
* <p>
* 配置好两个数据源
*/
@Configuration
public class DataSourceConfig {
// 将这个对象放入spring容器中(交给Spring管理)
@Bean
// 读取 application.yml 中的配置参数映射成为一个对象
@ConfigurationProperties(prefix = "spring.datasource.user0")
public XADataSource getDataSource0() {
// 创建XA连接池
return new MysqlXADataSource();
}
/**
* 创建Atomikos数据源
* 注解@DependsOn("druidXADataSourcePre"),在名为druidXADataSourcePre的bean实例化后加载当前bean
*/
@Bean
@DependsOn("getDataSource0")
@Primary
public DataSource dataSourcePre(@Qualifier("getDataSource0") XADataSource xaDataSource) {
//这里的AtomikosDataSourceBean使用的是spring提供的
AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
atomikosDataSourceBean.setXaDataSource(xaDataSource);
atomikosDataSourceBean.setMaxPoolSize(20);
return atomikosDataSourceBean;
}
@Bean
@ConfigurationProperties(prefix = "spring.datasource.user1")
public XADataSource getDataSource1() {
// 创建XA连接池
return new MysqlXADataSource();
}
@Bean
@DependsOn("getDataSource1")
public DataSource dataSourceSit(@Qualifier("getDataSource1") XADataSource xaDataSource) {
//这里的AtomikosDataSourceBean使用的是spring提供的
AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
atomikosDataSourceBean.setXaDataSource(xaDataSource);
return atomikosDataSourceBean;
}
}
MyBatis扫描
@Configuration
@MapperScan(basePackages = {"com.tian.mapper.user0"}, sqlSessionTemplateRef = "preSqlSessionTemplate")
public class MybatisPreConfig {
@Autowired
@Qualifier("dataSourcePre")
private DataSource dataSource;
/**
* 创建 SqlSessionFactory
*/
@Bean
@Primary
public SqlSessionFactory preSqlSessionFactory() throws Exception{
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(dataSource);
bean.setMapperLocations(new PathMatchingResourcePatternResolver().
getResources("classpath*:com/tian/mapper/user0/*.xml"));
return bean.getObject();
}
/**
* 通过 SqlSessionFactory 来创建 SqlSessionTemplate
*/
@Bean
@Primary
public SqlSessionTemplate preSqlSessionTemplate(@Qualifier("preSqlSessionFactory") SqlSessionFactory sqlSessionFactory){
// SqlSessionTemplate是线程安全的,可以被多个DAO所共享使用
return new SqlSessionTemplate(sqlSessionFactory);
}
}
另外一个基本一样,就是扫描路径改成:
("classpath*:com/tian/mapper/user1/*.xml")
mapper.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-////DTD Mapper 3.0//EN" "http:///dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.tian.mapper.user0.User0Mapper">
<!-- -->
<cache eviction="LRU" flushInterval="10000" size="1024" />
<resultMap id="BaseResultMap" type="com.tian.entity.User0">
<id column="id" jdbcType="BIGINT" property="id" />
<result column="user_name" jdbcType="VARCHAR" property="userName" />
<result column="age" jdbcType="INTEGER" property="age" />
<result column="gender" jdbcType="INTEGER" property="gender" />
</resultMap>
<sql id="Base_Column_List">
id, user_name, age, gender
</sql>
<insert id="insert" parameterType="com.tian.entity.User0">
insert into t_user_0 (id, user_name,age, gender)
values (#{id,jdbcType=BIGINT}, #{userName,jdbcType=VARCHAR},#{age,jdbcType=INTEGER},#{gender,jdbcType=INTEGER})
</insert>
</mapper>
另外一个基本完全一样,这里就贴出来了。
对应mapper接口 也非常简单,贴出一个:
public interface User0Mapper {
int insert(User0 record);
}
service
/**
* @author tianwc 公众号:java后端技术全栈、面试专栏
* @version 1.0.0
* @date 2023年05月11日 19:38
* 博客地址:<a href="http:///">博客地址</a>
* <p>
* 模拟三种场景:正常、制造异常、数据库异常
*/
@Service
public class UserServiceImpl implements UserService {
@Resource
private User0Mapper user0Mapper;
@Resource
private User1Mapper user1Mapper;
/**
* 正常逻辑 同时对两个数据库进行 插入数据
*/
@Transactional
@Override
public int transaction1() throws Exception {
User1 user1 = new User1();
user1.setUserName("22222");
user1.setAge(11);
user1.setGender(0);
user1Mapper.add(user1);
System.out.println("---------------------------");
// sit(数据源1)
User0 user0 = new User0();
user0.setUserName("111111");
user0.setAge(11);
user0.setGender(0);
user0Mapper.insert(user0);
return 1;
}
/**
* 正常逻辑 同时对两个数据库进行 插入数据
* 数据插入完后 出现异常
*/
@Transactional
@Override
public int transaction2() throws Exception {
User1 user1 = new User1();
user1.setUserName("22222");
user1.setAge(11);
user1.setGender(0);
user1Mapper.add(user1);
System.out.println("---------------------------");
// sit(数据源1)
User0 user0 = new User0();
user0.setUserName("111111");
user0.setAge(11);
user0.setGender(0);
user0Mapper.insert(user0);
//认为制造一个异常
int a=1/0;
return 1;
}
/**
* 第一个数据插入成功 第二个数据插入失败
*/
@Transactional
@Override
public int transaction3() throws Exception {
User1 user1 = new User1();
user1.setUserName("22222");
user1.setAge(11);
user1.setGender(0);
user1Mapper.add(user1);
System.out.println("---------------------------");
// sit(数据源1)
User0 user0 = new User0();
//故意搞长点,模拟插入失败 让前面的数据回滚 user0.setUserName("111110000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001");
user0.setAge(11);
user0.setGender(0);
user0Mapper.insert(user0);
return 1;
}
}
controller
@RestController
@RequestMapping("/user")
public class UserController {
@Resource
private UserService userService;
@PostMapping("/test1")
public CommonResult test1() {
int i = 0;
try {
i = userService.transaction1();
return CommonResult.success(i);
} catch (Exception e) {
e.printStackTrace();
}
return CommonResult.success(i);
}
@PostMapping("/test2")
public CommonResult test2() {
int i = 0;
try {
i = userService.transaction2();
return CommonResult.success(i);
} catch (Exception e) {
e.printStackTrace();
}
return CommonResult.success(i);
}
@PostMapping("/test3")
public CommonResult test3() {
int i = 0;
try {
i = userService.transaction3();
return CommonResult.success(i);
} catch (Exception e) {
e.printStackTrace();
}
return CommonResult.success(i);
}
}
项目启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
/**
* @author tianwc 公众号:java后端技术全栈、面试专栏
* @version 1.0.0
* @date 2023年05月11日 19:38
* 博客地址:<a href="http:///">博客地址</a>
* <p>
* 项目启动类
*/
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
//@ComponentScan(basePackages = {"com.tian"})
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
测试
启动项目,分别测试下面三个:
http://localhost:9001/user/test1
结果:两个数据库中,表数据都新增一条
http://localhost:9001/user/test2
结果:抛出除数不能为Zero的异常,两个数据库都没有新增数据。
http://localhost:9001/user/test3
结果:抛出数据字段值太长异常,两个数据库都没有新增数据。
好了,到此我们已经实现了分布式事务。