暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Seata自定义AOP事务控制器

Coding On Road 2021-02-05
792

Seata做为alibaba的组件之一,负责在微服务架构中实现分布式事务。之前写过一篇关于seata分布式事务的文章,但并没有应用于真实的生产环境。此次开发全国核检测APP,使用seata进行分布式事务的管理,主要是在支付、订单即关于现金业务的地方大多使用分布式事务。因为订单微服务与支付微服务分别进行部署,所以,必须要使用分布式事务。

      在以下的配置中,使用的Seata的AT模式。


1、首先你需要确定的事

1)、如果你所依赖的是spring boot starter 2.3.x则建议使用seata1.3.x。获取版本对应关系,请通过https://start.aliyun.com创建一个springboot项目,并添加nacos,nacos-config,seata依赖,并查看其版本依赖。


选择所有依赖以后,查看2.3.x依赖的seata的版本,可见使用spring starter 2.3.x时,依赖的seata为1.3.0



2)、如果使用nacos做为seata的配置和注册中心,除了添加nacos-discovery依赖之外必须要添加nacos-config的依赖。因为在项目启动时,seata需要从nacos中读取seata的配置信息。

<!-- seata的依赖,通过查看,spring-starter-2.3.7使用的是seata1.3.0 -->
<dependency>
  <groupId>com.alibaba.cloud</groupId>
  <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<dependency>
  <groupId>com.alibaba.cloud</groupId>
  <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
<!--seata事务配置完成-->

3)、由于在项目启动时,就会自动读取nacos配置中心的数据,所以,建议使用bootstrap.yml做为配置文件。因为从读取顺序上讲,boostrap.yml最先读取。


2、开始配置


步1、搭建nacos环境

  一般情况下,如果只有一台nacos服务器,在启动时可以选择使用standalone模式,即:nacos/bin/startup.bat -m standalone。但在本质上,一台主机,也可以模拟一个集群,只需要修改nacos/conf/cluster.conf即可。本处修改如下:

192.168.0.177:35848

(我项目中,使用的端口为35848,可根据你的实际情况进行修改)。

修改nacos/conf/application.properties文件,配置为使用mysql数据库:

server.servlet.contextPath=/nacos
server.port=35848
spring.datasource.platform=mysql
db.num=1
db.url.0=jdbc:mysql://127.0.0.1:3306/nacos2?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=Asia/Shanghai
db.user.0=root
db.password.0=123456


将nacos/conf/nacos-mysql.sql文件中的SQL语句,在连接的数据库上创建表表。

现在就可以启动nacos了,执行nacos/bin/startup.bat

"nacos is starting with cluster"

        ,--.
      ,--.'|

  ,--,: : |                                           Nacos 1.4.1
,`--.'`|  ' :                       ,---.               Running in cluster mode, All function modules
|   : : | |                      '   ,'\   .--.--.   Port: 35848
:   |   \ | : ,--.--.     ,---.     |    '   Pid: 4928
|   : ' '; |       \       \.   ; ,. :| : `./   Console: http://192.168.0.177:35848/nacos/index.html
'   ' ;.   ;.--. .-. |   ''   | |: :| : ;_
|   | | \   | \__\/: . ..    ' '   | .; : \ \    `.     https://nacos.io
'   : | ; .' ," .--.; |'   ; :__|   :   | `----.   \
|   | '`--' ,. |'   | '.'|\   \ `--'
'   : |     ; :   .'   \   :   : `----' '--'.    
;   |.'     | ,     .-./\   \           `--'---'
'---'        `--`---'     `----'

2021-02-05 08:28:44,347 INFO The server IP list of Nacos is [192.168.0.177:35848]
2021-02-05 08:28:53,691 INFO Nacos started successfully in cluster mode. use external storage


步2、配置seata

将seata-server-1.3.0.zip解压到任意目录下(建议没有中文没有空格的目录)。


1)修改seata/conf/file.conf如下,以下配置,为服务seata的server,即server所使用的本个表,seata有三个角色:server(服务器),tm(事务管理中心),client(客户端)。

其他的都删除即可,只配置db即可,建议给seata设置一个独立的数据库:
## transaction log store, only used in seata-server
store {
 ## store mode: file、db、redis
 mode = "db"
 ## database store property
 db {
   ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.
   datasource = "druid"
   ## mysql/oracle/postgresql/h2/oceanbase etc.
   dbType = "mysql"
   driverClassName = "com.mysql.cj.jdbc.Driver"
   url = "jdbc:mysql://127.0.0.1:3306/seata13?characterEncoding=UTF-8&serverTimeZone=Asia/Shanghai"
   user = "root"
   password = "123456"
   minConn = 5
   maxConn = 30
   globalTable = "global_table"
   branchTable = "branch_table"
   lockTable = "lock_table"
   queryLimit = 100
   maxWait = 5000
}
}


2)配置seata的注册中心和配置中心,修改seata/conf/registry.conf文件,如下:

#删除其他没有用的配置即可,只需要保留nacos的配置,其中registry为注册中心,config为配置中心
registry {
 # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"
nacos {
  application = "seata-server"
  serverAddr = "127.0.0.1:35848"
  group = "SEATA_GROUP"
  namespace = "public"
  cluster = "default"
  username = "nacos"
  password = "nacos"
 }
}
config {
 # file、nacos 、apollo、zk、consul、etcd3
type = "nacos"
nacos {
  serverAddr = "127.0.0.1:35848"
  namespace = "public"
  group = "SEATA_GROUP"
  username = "nacos"
  password = "nacos"
 }
}


3)创建seata server所需要的表,下载seata-1.3.0的源代码,在script/server/db/mysql.sql中找到需要执行的sql在file.conf中指定的数据库中,创建上述的表,主要是三个表,请自行查看myhsql.sql中的表结构。


4)将所有配置导入到nacos

在script/config-center中修改config.txt文件,由于这儿我们配置的是db模式,所以其他没有用的模式可以直接删除:

#注意,如果直接使用此文件,请将所有#注释行删除后再导入到nacos
transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableClientBatchSendRequest=false
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
#注意其中的xiaoyi_tx_group,此值为唯一的事务分组,在client端,即在springboot项目中,必须设置的
#seata.tx-service-group=xiaoyi_tx_group相同
service.vgroupMapping.xiaoyi_tx_group=default
#后期我在启动时,将使用38091端口,做为seata的端口,默认端口为8091,另,建议配置的服务地为ip路由地址而非127.0.0.1
service.default.grouplist=192.168.0.177:38091
service.enableDegrade=false
service.disableGlobalTransaction=false
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=false
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
#只保留了db模式,其他模式全部删除即可
store.mode=db
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.cj.jdbc.Driver
store.db.url=jdbc:mysql://192.168.0.177:3306/seata13?useUnicode=true&serverTimezone=Asia/Shanghai
store.db.user=root
store.db.password=123456
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.log.exceptionRate=100
transport.serialization=seata
transport.compressor=none
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898

5) 现在我们将上面的所有导入到nacos的配置中

在windows上,打开git命令行窗口,此窗口可以执行.sh的文件,执行如下:

进入script/config-center/nacos中执行查看帮助:

admin@DESKTOP-0SUC7JT MINGW64 e/downloads/seata-1.3.0/script/config-center/nacos
$ ./nacos-config.sh -?
USAGE OPTION: ./nacos-config.sh [-h host] [-p port] [-g group] [-t tenant] [-u username] [-w password]

其中 -t tenant为指定命名空间。

#建议,不要指定-t参数,建议使用默认public命名空间即可,以下只是为了将代码放到这儿再执行一次。如果导入一个指定的命名空间,请先在nacos上创建这个命名空间:
admin@DESKTOP-0SUC7JT MINGW64 e/downloads/seata-1.3.0/script/config-center/nacos
$ ./nacos-config.sh -h 192.168.0.177 -p 35848 -t seata -u nacos -w nacos2020
set nacosAddr=192.168.0.177:35848
set group=SEATA_GROUP
Set transport.type=TCP successfully
Set transport.server=NIO successfully
Set transport.heartbeat=true successfully
Set transport.enableClientBatchSendRequest=false successfully
Set transport.threadFactory.bossThreadPrefix=NettyBoss successfully
Set transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker successfully
Set transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler successfully
Set transport.threadFactory.shareBossWorker=false successfully
Set transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector successfully
Set transport.threadFactory.clientSelectorThreadSize=1 successfully
Set transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread successfully
Set transport.threadFactory.bossThreadSize=1 successfully
Set transport.threadFactory.workerThreadSize=default successfully
=========================================================================
Complete initialization parameters, total-count:66 , failure-count:0
=========================================================================
Init nacos config finished, please start seata-server.

注意,导入完成以后,必须要对store.db.url值的url进行一次修改,因为导入时&符号后台的参数将不会保存到nacos上,所以需要修改一下:jdbc:mysql://192.168.0.177:3306/seata13?useUnicode=true&serverTimezone=Asia/Shanghai&characterEncoding=UTF-8

6)现在就可以启动seata了

由于上面的配置中,指定了seata的端口,所以启动时请添加-p 38091端口参数

seata-server.bat -p 38091

7)现在查看一下nacos中的所有服务,应该会发现seata-server已经注册进来了

服务名分组名称集群数目实例数健康实例数触发保护阈值操作
seata-serverSEATA_GROUP111false


步3、开发项目并使用seata分布式事务

这儿需要添加的主要配置包括:

  1)、nacos-config 用于让seata读取配置中心的数据。

  2)、nacos-discovery 用于将自己的项目注册到nacos中心。

  3)、starter-alibaba-seata用于依赖seata。

主要的依赖如下,其他的依赖略。

<!-- seata的依赖,通过查看,spring-starter-2.3.7使用的是seata1.3.0 -->
<dependency>
  <groupId>com.alibaba.cloud</groupId>
  <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<dependency>
  <groupId>com.alibaba.cloud</groupId>
  <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
<dependency>
          <groupId>com.alibaba.cloud</groupId>
          <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
      </dependency>


完整的配置文,bootstrap.yml如下:

server:
port: 6200
servlet:
  encoding:
    charset: UTF-8
spring:
application:
  name: xiaoyi-detect
profiles:
  active: dev
cloud:
  nacos:
    discovery:
      namespace: public
      group: DEFAULT_GROUP
      server-addr: localhost:35848
      username: nacos
      password: nacos2020
     #注意,由于依赖的nacos-config,如果这儿不配置,将默认为localhost:8848,所,即便是本项目没有从nacos上读取配置,也需要配置此项目
    config:
      namespace: public
      group: DEFAULT_GROUP
      server-addr: localhost:35848
      username: nacos
      password: nacos2020
datasource:
  driver-class-name: com.mysql.cj.jdbc.Driver
  hikari:
    maximum-pool-size: 10
    minimum-idle: 3
  password: wang@907
  type: com.zaxxer.hikari.HikariDataSource
  url: jdbc:mysql://localhost:57732/detect?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
  username: wang@907
jackson:
  date-format: yyyy-MM-dd HH:mm:ss
  time-zone: GMT+08
redis:
  database: 1
  host: localhost
  password: xiaoyi@907()!@
  port: 47387
servlet:
  multipart:
    max-file-size: 10MB
    max-request-size: 100MB
#openfeign配置
feign:
hystrix:
   #开启熔断能力,必须要开启此能力
  enabled: true
client:
  config:
    default:
      connect-timeout: 10000
      readTimeout: 600000
mybatis:
configuration:
  call-setters-on-nulls: true
type-aliases-package: com.xiaoyi.**.entity,com.xiaoyi.**.form
swagger:
enabled: false
validate:
permission:
  enabled: true
token:
  enabled: true
#seata的相关配置
seata:
enabled: true
application-id: ${spring.application.name}
 #指定与seata导入的配置相同的事务名称
tx-service-group: xiaoyi_tx_group
enable-auto-data-source-proxy: true
config:
  type: nacos
  nacos:
    server-addr: 127.0.0.1:35848
    username: nacos
    password: nacos2020
    namespace: public
    group: SEATA_GROUP
#建议将日志配置放到最后
logging:
file:
  name: ./logs/${spring.application.name}.log
  path: ./logs
level:
  root: INFO
  com.xiaoyi: debug
  com.alibaba.nacos: warn
  org.mybatis: debug


开发AOP切面:

package com.xiaoyi.config.aop.tx;
import com.google.common.base.Throwables;
import com.xiaoyi.common.lang.XiaoYiException;
import io.seata.core.context.RootContext;
import io.seata.core.exception.TransactionException;
import io.seata.core.model.GlobalStatus;
import io.seata.tm.api.GlobalTransaction;
import io.seata.tm.api.GlobalTransactionContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.annotation.AfterThrowing;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

/**
* 分布式事务切面处理
*
* @author wj
* @since 1.0 2021-02-04
*/
@Slf4j
@Aspect
@Component
public class TransactionAop implements ApplicationContextAware {
   private String applicationName;
   @Pointcut("execution(* com.xiaoyi..*Service.*(..)) && @annotation(io.seata.spring.annotation.GlobalTransactional)")
   public void cut() {
  }
   @Before(value = "cut()")
   public void before(JoinPoint jp) throws TransactionException {
       log.info(applicationName + "-----------------开始事务拦截-----------------");
       Signature signature = jp.getSignature();
       if (signature instanceof MethodSignature) {
           MethodSignature ms = (MethodSignature) signature;
           String name = ms.getMethod().getName();
           log.info(applicationName + ",拦截到的方法名为:" + name);
      }
       GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
       // 参数说明:第一个为超时时间,第二个参数可以任意名称,但必须要保证所有的分布式微服务中,使用同一个名称,以便于保证事务的唯一性
       tx.begin(1000*7, "tx1");
       log.info(applicationName + ",开始事或当前事务ID为:" + tx.getXid());
  }

   @AfterThrowing(value = "cut()", throwing = "e")
   public void afterThrowing(Throwable e) throws TransactionException {
       log.info(applicationName + "有异常信息" + e.getMessage());
       String xid = RootContext.getXID();
       log.info(applicationName + ",事务xid为:" + xid);
       if (io.seata.common.util.StringUtils.isNotEmpty(xid)) {
           log.info(applicationName + "发现有事务id,现在开始准备手工回滚");
           int code = GlobalTransactionContext.reload(xid).getStatus().getCode();
           boolean finished = code == GlobalStatus.Finished.getCode();
           log.info(applicationName + ",此事务id的状态码为:" + code + ",事务是否已经完成状态:" + finished);
           if (!finished) {
               log.info(applicationName + ",事务还没有完成,开始回滚");
               GlobalTransactionContext.reload(xid).rollback();
               log.info(applicationName + ",回滚完成");
               //获取到最初的异常
               Throwable rootCause = Throwables.getRootCause(e);
               log.info("最初的异常为:" + rootCause.getMessage() + "," + rootCause.getClass());
               throw new XiaoYiException("事务回滚原因:" + rootCause.getMessage());
          } else {
               log.info("事务已经完成,什么也不做");
          }
      }
  }
   @Override
   public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
       String name = applicationContext.getEnvironment().getProperty("spring.application.name", "");
       if (StringUtils.isNotBlank(name)) {
           this.applicationName = name;
      } else {
           this.applicationName = "无名项目";
      }
       log.info("项目名称:{}", name);
  }
}


测试事务 测试过程如下,

分别创建三个微服务 detect - >调用  Order - > 调用Pay。经测试,事务已经很好的控制:

以下是detect项目中的service,它通过feign调用了order:

package com.xiaoyi.detect.api.demo01.service;

import com.xiaoyi.common.lang.XiaoYiException;
import com.xiaoyi.common.vo.AjaxResult;
import com.xiaoyi.detect.api.demo01.mapper.Demo01Mapper;
import com.xiaoyi.detect.feign.clients.OrderFeignClient;
import io.seata.core.context.RootContext;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class Demo01Service {
   @Autowired
   private Demo01Mapper demo01Mapper;
   @Autowired
   private OrderFeignClient orderFeignClient;
   @GlobalTransactional
   public void save(String name, Integer age) {
       String xid = RootContext.getXID();
       log.info("事务ID为:{}", xid);
       String addr = name;
       AjaxResult save = orderFeignClient.save(addr, "139" + age);//调用feign
       if(!save.isSuccess()){
           throw new XiaoYiException(save.getMsg());
      }
       log.info("调用order的结果为:"+save);
       name += ",事务ID为:" + xid;
       int rows = demo01Mapper.save(name, age);
       log.info("写入数量 {} 行", rows);
  }
}


以下是order中的service,它又调用了pay:

package com.xiaoyi.order.api.demo02.service;

import com.xiaoyi.common.lang.XiaoYiException;
import com.xiaoyi.common.vo.AjaxResult;
import com.xiaoyi.order.api.demo02.entity.Demo02;
import com.xiaoyi.order.api.demo02.mapper.Demo02Mapper;
import com.xiaoyi.order.feign.clients.pay.wechatpay.WechatPayFeignClient;
import io.seata.core.context.RootContext;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class Demo02Service {
   @Autowired
   private Demo02Mapper demo02Mapper;

   @Autowired
   private WechatPayFeignClient wechatPayFeignClient;

   @GlobalTransactional
   public boolean save(Demo02 demo){
       String xid =  RootContext.getXID();
       demo.setAddress(demo.getAddress()+",xid is:"+xid);
       int rows = demo02Mapper.insert(demo);
       log.info("写入数量:{} 行 and new 新的id为 {}",rows,demo.getId());
       if(demo.getPhone().endsWith("5")){
           throw new XiaoYiException("Order模块自己抛出的异常");
      }
       AjaxResult result = wechatPayFeignClient.save(demo.getAddress(),demo.getPhone());
       log.info("调用pay的结果:"+result);
       if(!result.isSuccess()){
           throw new XiaoYiException(result.getMsg());
      }
       return rows>0;
  }
}


pay中的代码:

package com.xiaoyi.pay.api.demo03.service;
import com.xiaoyi.common.lang.XiaoYiException;
import com.xiaoyi.pay.api.demo03.entity.Demo03;
import com.xiaoyi.pay.api.demo03.mapper.Demo03Mapper;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class Demo03Service {
   @Autowired
   private Demo03Mapper demo03Mapper;
   @GlobalTransactional
   public void save(Demo03 demo03){
       demo03Mapper.save(demo03);
       log.info("保存demo03成功,生成的id为:"+demo03.getId());
       if(demo03.getMobile().endsWith("9")){
           throw new XiaoYiException("在Pay模块中故意抛出的异常");
      }
  }
}


步4、测试运行

数据回滚过程分析

如果上一个微服务中分布式事务已经被回滚,则在调用方事务控制中,什么也不需要做:





文章转载自Coding On Road,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论