Seata(二)分布式事务Seata集成实践

分布式事务
placeholder image
admin 发布于:2023-02-04 21:42:01
阅读:loading

1.示例说明

前文介绍了什么是Seata以及Seata Server的搭建和集群部署,本篇文章则使用IDEA创建分模块的项目来实践一下基于AT模式的分布式事务。

本次示例构建在一个多模块的项目下,两个业务功能模块seata-users和seata-bless分别提供了操作数据库的HTTP Restful接口,使用主程序模块seata-bootstrap来分别调用这俩微服务模块的接口,模拟常规异常场景来验证两个不同的数据库下的数据是否能正确提交和回滚,同时提交或回滚。

1.1 项目结构

(1)seata是整个项目名称,起到定义使用框架版本信息和具体子模块的功能;

(2)seata-base是所有业务功能的子模块,定义各个模块的公用组件依赖和公共处理类;

(3)seata-bootstrap是项目的主启动模块,定义Spring Boot程序的启动和API调用模块;

(4)seata-users是业务功能模块,定义用户相关的模块功能;

(5)seata-bless是业务功能模块,随便定义另外的一块业务功能(由于实践阶段是过年正当时,起名bless寓意祝福和好运吧);

1.2 项目模块

(1)seata-base独立,不依赖项目其它模块,参考maven pom.xml依赖(此处坐标仅包含seata-base模块的公共依赖,springboot版本为2.4.2;springcloud版本为2020.0.1;springcloud Alibaba版本为2021.1)如下:

<dependencies>
    <!-- Web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- Nacos注册中心 -->
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
    </dependency>
    <!-- 监控中心 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <!-- 接口服务 -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-openfeign</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-loadbalancer</artifactId>
        <version>3.1.5</version>
    </dependency>
    <!-- MySQL数据库驱动 -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.11</version>
    </dependency>
    <!-- Druid连接池 -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>druid</artifactId>
        <version>1.1.16</version>
    </dependency>
    <!-- MyBatisPlus -->
    <dependency>
        <groupId>com.baomidou</groupId>
        <artifactId>mybatis-plus-boot-starter</artifactId>
        <version>3.3.1</version>
    </dependency>
    <!-- lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.6</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/io.seata/seata-spring-boot-starter -->
    <dependency>
        <groupId>io.seata</groupId>
        <artifactId>seata-spring-boot-starter</artifactId>
        <version>1.6.1</version>
    </dependency>

</dependencies>

(2)seata-users和seata-bless均依赖seata-base;

(3)seata-bootstrap依赖seata-users和seata-bless两个模块;

1.3 项目实现

(1)上述模块的定义中seata-bootstrap微服务端口号为8060;seata-bless微服务端口为8050;seata-users微服务端口为8040;后面两个微服务提供数据保存的Restful接口;

(2)seata-users连接的是本地数据库chendd-examples;seata-bless连接的是本站远程数据库Struts,即两个不同机器的MySQL数据库

(3)seata-bootstrap微服务模块使用Open Feign连接seata-users、seata-bless模块,实现接口的交互,均集成Nacos,从服务的注册中心调用

(4)示例调用代码在seata-bootstrap微服务,分别提供了3个测试接口:

   A:微服务seata-users的接口调用成功,微服务seata-bless的接口调用成功。二者接口均调用成功,事务提交验证:全部提交;

   B:微服务seata-users的接口调用成功,微服务seata-bless的接口调用失败(先保存数据再手动抛出运行时异常),事务提交验证:全部回滚

   C:微服务seata-users的接口调用成功,微服务seata-bless的接口调用成功。二者接口均调用成功,但在二者接口调用完毕后再手动抛出运行时异常,事务提交验证:全部回滚;

(5)seata的AT模式的分布式事务要求所有客户端接入的数据库均需要创建“undo_log”表,即seata-users和seata-bless两个微服务模块连接的数据源需要创建undo_log表,建表语句的SQL参见:https://seata.io/zh-cn/docs/user/quickstart.html

2.示例代码

image.png

(示例代码包结构)

package cn.chendd.bootsrtap.service.impl;

import cn.chendd.bootsrtap.client.GoodLuckClient;
import cn.chendd.bootsrtap.client.UsersClient;
import cn.chendd.bootsrtap.service.TransactionalService;
import io.seata.core.context.RootContext;
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**
 * 验证事务接口实现
 *
 * @author chendd
 * @date 2023/1/20 16:29
 */
@Service
public class TransactionalServiceImpl implements TransactionalService {

    @Resource
    private UsersClient usersClient;
    @Resource
    private GoodLuckClient goodLuckClient;

    @Override
    @GlobalTransactional(rollbackFor = Exception.class)
    public void saveDataCommit() {
        System.out.println("TX_XID--->" + RootContext.getXID());
        //两个接口保存成功,事务提交
        this.usersClient.insertSuccess();
        this.goodLuckClient.insertSuccess();
    }

    @Override
    @GlobalTransactional(rollbackFor = Exception.class)
    public void saveDataRollback1() {
        //第一个保存成功,第二个保存失败,事务回滚
        System.out.println("TX_XID--->" + RootContext.getXID());
        this.usersClient.insertSuccess();
        this.goodLuckClient.insertException();
        System.out.println("TransactionalServiceImpl.saveDataRollback1");
    }

    @Override
    @GlobalTransactional(rollbackFor = Exception.class)
    public void saveDataRollback2() {
        //第一个保存成功,第二个保存成功,但后续逻辑出现运行时异常,事务回滚
        System.out.println("TX_XID--->" + RootContext.getXID());
        this.usersClient.insertSuccess();
        this.goodLuckClient.insertSuccess();
        System.out.println("TransactionalServiceImpl.saveDataRollback2---" + (1 / 0));
    }

}

以上是Service接口的实现定义,UsersClient和GoodLuckClient均是Open Feign对应的接口示例,参考代码如下:

/***seata-users接口调用客户端***/

package cn.chendd.bootsrtap.client;

import cn.chendd.bootsrtap.SeataXidRequestInterceptor;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;

/**
 * 用户管理 接口调用
 * @author chendd
 * @date 2023/1/23 23:47
 */
@FeignClient(name = "chendd-seata-users" , path = "/seata-users/api" , configuration = SeataXidRequestInterceptor.class)
public interface UsersClient {

    /**
     * 插入数据成功
     */
    @GetMapping("/success")
    void insertSuccess();
}

/***seata-bless接口调用客户端***/
package cn.chendd.bootsrtap.client;

import cn.chendd.bootsrtap.SeataXidRequestInterceptor;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;

/**
 * 新年好运 接口调用
 * @author chendd
 * @date 2023/1/23 23:47
 */
@FeignClient(name = "chendd-seata-bless" , path = "/seata-bless/api" , configuration = SeataXidRequestInterceptor.class)
public interface GoodLuckClient {

    /**
     * 插入数据成功
     */
    @GetMapping("/success")
    void insertSuccess();

    /**
     * 插入数据失败
     */
    @GetMapping("/exception")
    void insertException();
}

3.运行效果

image.png

(启动控制台输出效果)

image.png

(分布式事务提交)

image.png

(分布式事务回滚)

4.其它说明

(1)seata控制台的访问端口默认为7091,seata-server的默认端口为“控制台端口号 + 1000”,若配置则以配置的为准;控制台默认账户和密码均为seata;

(2)上述示例中的分布式事务连接的192.168.244.138服务,第三次再调用分布式事务回滚验证的时候触发的是192.168.244.139服务,说明连接的多个Seata-Server可用,参考139服务的回滚截图:

image.png

(3)Nacos Server注册中心截图参考如下:

image.png

(4)错误一:事务失效,细究原因未获取到TXID(源码中保留了几处System.out.println打印TX_XID变量的代码),网上找的说将springboot-starter切换为springcloud的坐标即可,但可能是版本未匹配,经过尝试后未生效。后通过增加feign的拦截器,增加header后解决,可参考GoodLuckClient代码中的拦截器配置,参考代码如下:

package cn.chendd.bootsrtap;

import feign.RequestInterceptor;
import feign.RequestTemplate;
import io.seata.core.context.RootContext;

/**
 * Seata事务的txId携带
 *
 * @author chendd
 * @date 2023/1/24 17:20
 */
public class SeataXidRequestInterceptor implements RequestInterceptor {

    @Override
    public void apply(RequestTemplate template) {
        boolean xid = template.headers().containsKey(RootContext.KEY_XID);
        if (! xid) {
            template.header(RootContext.KEY_XID , RootContext.getXID());
        }
    }
}

(5)错误二:抛出ShouldNeverHappenException异常,具体描述为“get table meta failed, please check whether the table 表名称 exists.”。该异常的抛出逻辑是根据表查询表的元数据信息并进行缓存,经过源码的调试分析发现表的创建不够规范(随意拿着一个历史表使用的),设置主键后解决;

(6)配置了seata集群后,若某一台服务器的seata关闭后或异常宕机后,控制台将输出INFO日志(AbstractNettyRemotingClient$ClientHandler#0, [id: 0x1fc2af15, L:/192.168.244.1:61289 ! R:/192.168.244.138:8091]) will closed,程序的正常运行将提供正常的支持,说明集群配置有效;

(7)程序框架的范围:Spring Cloud Seata + Mybatis-Plus + Open Feign + Nacos + MySQL;

(8)关于建表语句比较简单,可转至《分布式事务Atomikos介绍与实践》所对应的源码中查找;

(9)Seata的AT模式在事务支持的SQL实现上有很多限制条件,并不是所有对数据库的操作都支持回滚,这点需要格外注意官网资料需多看看,倒是觉得TCC模式的事务还是很不错的;

(10)项目源码:源码下载.txt



 点赞


 发表评论

当前回复:作者

 评论列表


留言区