暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

分布式事务Seata,还没用过?

码酱 2021-09-07
1099

先聊聊什么是分布式事务?

首先我们回顾一下在单体应用中,例如一个业务调用了3个模块,他们都使用同一个数据源,是靠本地事务来保证事务一致性。

可随着业务量的不断增长,单体架构渐渐扛不住巨大的流量,此时就需要对数据库、表做分库分表处理,将应用 SOA 服务化拆分。也就产生了库存中心、订单中心、账户中心等

由此带来的问题就是业务间相互隔离,每个业务都维护着自己的数据库,各自都有独立的数据源。可此时我们只能保证自己本地的数据一致性,无法保证调用其他服务的操作是否成功,所以为了保证整个下单流程的数据一致性,就需要分布式事务介入。
简单概括分布式事务:
指一次大的操作由不同的小操作组成的,这些小的操作分布在不同的服务器上,分布式事务需要保证这些小操作要么全部成功,要么全部失败。从本质上来说,分布式事务就是为了保证不同数据库的数据一致性。

什么是Seata?

  • 官网http://seata.io/zh-cn/

利用Seata如何理分布式事务

Business 是业务入口,在程序中会通过注解来说明他是一个全局事务,这时他的角色为 TM(事务管理者)。Business 会请求 TC(事务协调器,一个独立运行的服务),说明自己要开启一个全局事务,TC 会生成一个全局事务ID(XID),并返回给 Business。Business 得到 XID 后,开始调用微服务,例如调用 Storage。Storage 会收到 XID,知道自己的事务属于这个全局事务。Storage 执行自己的业务逻辑,操作本地数据库。Storage 会把自己的事务注册到 TC,作为这个 XID 下面的一个分支事务,并且把自己的事务执行结果也告诉 TC。此时 Storage 的角色是 RM(资源管理者),资源是指本地数据库。Order、Account 的执行逻辑与 Storage 一致。在各个微服务都执行完成后,TC 可以知道 XID 下各个分支事务的执行结果,TM(Business) 也就知道了。Business 如果发现各个微服务的本地事务都执行成功了,就请求 TC 对这个 XID 提交,否则回滚。TC 收到请求后,向 XID 下的所有分支事务发起相应请求。各个微服务收到 TC 的请求后,执行相应指令,并把执行结果上报 TC。
重要机制:

(1)全局事务的回滚是如何实现的呢?

Seata 有一个重要的机制:回滚日志。每个分支事务对应的数据库中都需要有一个回滚日志表 UNDO_LOG,在真正修改数据库记录之前,都会先记录修改前的记录值,以便之后回滚。在收到回滚请求后,就会根据 UNDO_LOG 生成回滚操作的 SQL 语句来执行。如果收到的是提交请求,就把 UNDO_LOG 中的相应记录删除掉。

(2)RM 是怎么自动和 TC 交互的?

是通过监控拦截JDBC实现的,例如监控到开启本地事务了,就会自动向 TC 注册、生成回滚日志、向 TC 汇报执行结果。

(3)二阶段回滚失败怎么办?

例如 TC 命令各个 RM 回滚的时候,有一个微服务挂掉了,那么所有正常的微服务也都不会执行回滚,当这个微服务重新正常运行后,TC 会重新执行全局回滚。
核心组件
  • 事务协调器 TC

    维护全局和分支事务的状态,指示全局提交或者回滚。

  • 事务管理者 TM

    开启、提交或者回滚一个全局事务。

  • 资源管理者 RM

    管理执行分支事务的那些资源,向TC注册分支事务、上报分支事务状态、控制分支事务的提交或者回滚

Seata工作流程


  • TM 请求 TC,开始一个新的全局事务,TC 会为这个全局事务生成一个 XID。

  • XID 通过微服务的调用链传递到其他微服务。

  • RM 把本地事务作为这个XID的分支事务注册到TC。

  • TM 请求 TC 对这个 XID 进行提交或回滚。

  • TC 指挥这个 XID 下面的所有分支事务进行提交、回滚。

Seata环境配置

1.下载seata-server-1.4.2和seata-1.4.2源码

  • seate-server下载: https://seata.io/zh-cn/blog/download.html

  • seata-1.4.2源码下载: https://github.com/seata/seata/releases

2.创建undo_log日志表

在seata1.4.2源码seata-1.4.2\script\client\at\db目录下有提供针对mysql、oracle、postgresql这三种数据库生成undo-log逆向日志回滚表的表创建脚本。
-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE IF NOT EXISTS `undo_log`
(
    `branch_id`     BIGINT       NOT NULL COMMENT 'branch transaction id',
    `xid`           VARCHAR(128) NOT NULL COMMENT 'global transaction id',
    `context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
    `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
    `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
    `log_created`   DATETIME(6)  NOT NULL COMMENT 'create datetime',
    `log_modified`  DATETIME(6)  NOT NULL COMMENT 'modify datetime',
    UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';

在你项目参与全局事务的数据库中加入undo_log这张表。undo_log表脚本根据自身数据库类型来选择。

3.创建seata事务相关表

创建seata数据库,在seata1.4.2源码seata-1.4.2\script\server\db中运行mysql.sql脚本

-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
    `xid`                       VARCHAR(128) NOT NULL,
    `transaction_id`            BIGINT,
    `status`                    TINYINT      NOT NULL,
    `application_id`            VARCHAR(32),
    `transaction_service_group` VARCHAR(32),
    `transaction_name`          VARCHAR(128),
    `timeout`                   INT,
    `begin_time`                BIGINT,
    `application_data`          VARCHAR(2000),
    `gmt_create`                DATETIME,
    `gmt_modified`              DATETIME,
    PRIMARY KEY (`xid`),
    KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
    KEY `idx_transaction_id` (`transaction_id`)
) ENGINE 
= InnoDB
  DEFAULT CHARSET = utf8;

-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
    `branch_id`         BIGINT       NOT NULL,
    `xid`               VARCHAR(128) NOT NULL,
    `transaction_id`    BIGINT,
    `resource_group_id` VARCHAR(32),
    `resource_id`       VARCHAR(256),
    `branch_type`       VARCHAR(8),
    `status`            TINYINT,
    `client_id`         VARCHAR(64),
    `application_data`  VARCHAR(2000),
    `gmt_create`        DATETIME(6),
    `gmt_modified`      DATETIME(6),
    PRIMARY KEY (`branch_id`),
    KEY `idx_xid` (`xid`)
) ENGINE 
= InnoDB
  DEFAULT CHARSET = utf8;

-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
    `row_key`        VARCHAR(128) NOT NULL,
    `xid`            VARCHAR(128),
    `transaction_id` BIGINT,
    `branch_id`      BIGINT       NOT NULL,
    `resource_id`    VARCHAR(256),
    `table_name`     VARCHAR(32),
    `pk`             VARCHAR(36),
    `gmt_create`     DATETIME,
    `gmt_modified`   DATETIME,
    PRIMARY KEY (`row_key`),
    KEY `idx_branch_id` (`branch_id`)
) ENGINE 
= InnoDB
  DEFAULT CHARSET = utf8;

4.修改seata-server的registry.conf和file.conf

配置注册中心为nacos,配置nacos相关参数

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "nacos"

  nacos {
    application = "seata-server"
    serverAddr = "127.0.0.1:8848"
    group = "SEATA_GROUP"
    namespace = "public"
    cluster = "default"
    username = "nacos"
    password = "nacos"
  }

配置file.config的DB模式相关参数配置

## transaction log store, only used in seata-server
store {
  ## store mode: file、db、redis
  mode = "db"
  ## rsa decryption public key
  publicKey = ""
  ## file store property
  file {
    ## store location dir
    dir = "sessionStore"
    # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
    maxBranchSessionSize = 16384
    # globe session size , if exceeded throws exceptions
    maxGlobalSessionSize = 512
    # file buffer size , if exceeded allocate new buffer
    fileWriteBufferCacheSize = 16384
    # when recover batch read size
    sessionReloadReadSize = 100
    # async, sync
    flushDiskMode = async
  }

  ## database store property
  db {
    ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.
    datasource 
"druid"
    ## mysql/oracle/postgresql/h2/oceanbase etc.
    dbType = "mysql"
    driverClassName = "com.mysql.jdbc.Driver"
    ## if using mysql to store the data, recommend add rewriteBatchedStatements=true in jdbc connection param
    url = "jdbc:mysql://127.0.0.1:3306/seata?rewriteBatchedStatements=true"
    user = "root"
    password = "root8888"
    minConn = 5
    maxConn = 100
    globalTable = "global_table"
    branchTable = "branch_table"
    lockTable = "lock_table"
    queryLimit = 100
    maxWait = 5000
  }

5.修改提交nacos脚本到nacos控制台

首先运行nacos服务,然后参考:https://github.com/seata/seata/tree/develop/script/config-center 下的config.txt文件并做修改,如下示例:
service.vgroupMapping.my_test_tx_group=default
store.mode=db
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true
store.db.user=username
store.db.password=password
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000

  • config.txt提供的脚本是seata-server服务自身所需要的配置文件信息,我们把相关脚本提交到nacos中,然后方便在nacos控制台上进行修改变更了。

  • 运行仓库:https://github.com/seata/seata/tree/develop/script/config-center/nacos 中提供的nacos脚本nacos-config.sh,将以上信息提交到nacos控制台,如果有需要修改参数,可直接通过登录nacos控制台修改。

6.application.yml配置

从官方github仓库:https://github.com/seata/seata/tree/develop/script/client 拿到参考配置做修改,加到你项目的application.yml文件中
#Seata分布式事务配置(AT模式)
seata:
  enabled: true
  application-id: ${spring.application.name}
  #客户端和服务端在同一个事务组
  tx-service-group: my_test_tx_group
  enable-auto-data-source-proxy: true
  service:
    vgroup-mapping:
      my_test_tx_group: default
  config:
    type: nacos
    nacos:
      namespace: "public"
      serverAddr: 127.0.0.1:8848
      group: SEATA_GROUP
      username: "nacos"
      password: "nacos"
  #服务注册到nacos
  registry:
    type: nacos
    nacos:
      application: seata-server
      server-addr: 127.0.0.1:8848
      group: SEATA_GROUP
      namespace: "public"
      username: "nacos"
      password: "nacos"
      cluster: default

7.启动seata服务

启动运行seata-server,成功后,运行自己的服务提供者,服务调用者。在全局事务调用者(发起全局事务的服务)的接口上加入@GlobalTransactional注解。通过nacos,我们能看到seata成功注册。

到此,我们的seata服务算是配置完了,并正常启动。

Seata入门案例

1.项目结构

|-- cloud-seata       (父级工程)
    |-- cloud-api     (基础工程)
    |-- cloud-storage (库存服务)
    |-- cloud-order   (订单服务)
    |-- cloud-account (账户服务)
    |-- cloud-business(业务服务)

2.seata主要依赖

<!-- seata依赖 -->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
    <version>${cloud.version}</version>
</dependency>

3.cloud-api提供的对外接口

//订单服务
public interface OrderService {

    /**
     * 创建订单
     */

    Order create(String userId, String commodityCode, int orderCount);
}
//账户服务
public interface AccountService {

    /**
     * 从用户账户中借出
     */

    void debit(String userId, int money);
}
//库存服务
public interface StorageService {

    /**
     * 扣除存储数量
     */

    void deduct(String commodityCode, int count);
}

4.cloud-order服务中order-serviceImpl

@DubboService
public class OrderServiceImpl implements OrderService {

    @Autowired
    private OrderMapper orderMapper;
    
    @DubboReference
    private AccountService accountService;

    @Override
    public Order create(String userId, String commodityCode, int orderCount) {
        int orderMoney = calculate(commodityCode, orderCount);
        //账户减少余额
        accountService.debit(userId, orderMoney);
        Order order = new Order();
        order.setUserId(userId)
                .setCommodityCode(commodityCode)
                .setOrderCount(orderCount)
                .setMoney(orderMoney);
        //创建订单
        orderMapper.insert(order);
        return order;
    }

    private int calculate(String commodityCode, int orderCount) {
        //getPriceByCommodityCode(commodityCode);//查询商品服务得到商品单价
        int price = 10;
        return price*orderCount;
    }
}

5.cloud-business服务businessService

@Service
public class BusinessService {

    @DubboReference
    private OrderService orderService;
    @DubboReference
    private StorageService storageService;

    /**
     * 购买
     * @param userId
     * @param commodityCode
     * @param orderCount
     */

    @GlobalTransactional(rollbackFor = Exception.class)
    public void purchase(String userIdString commodityCodeint orderCount)
{
        //扣减库存
        storageService.deduct(commodityCode,orderCount);
        //创建订单
        orderService.create(userId,commodityCode,orderCount);
    }
}

5.新建三张数据库表分别为account.account,storage.storage,order.order

6.businessController

@GetMapping("/purchase")
public String purchase(){
    businessService.purchase("001","001",10);
    return "下单成功"+"商品【001】x 10";
}

7.postman测试,查看数据库

可以看到订单库新增一条订单,库存表001商品库存数量减10,账户表001用户余额减少10*10=100。

7.增加异常测试,查看数据库(由于 Seata 删除回滚日志的速度很快,所以要想在表中看见回滚日志,必须要在某一个服务上打断点才看的更明显。

回滚记录

可以看到发生异常后

数据还是没有发生变化,放行断点,再去查看seata回滚日志,已经被seata删除了。


更多seata学习样例请参考https://github.com/seata/seata-samples



喜欢就加个关注吧,


往期精选

#SpringCloudAlibaba系列
#SpringCloudNetflix系列



文章转载自码酱,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论