暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

tcc-transaction源码分析与思考

盛超杰的笔记 2018-05-18
176

tcc介绍

tcc是分布式事务的一种解决方案,即Try,Commit,Cancel

Try: 尝试执行业务

  1. 完成所有业务检查(一致性)


  2. 预留必须业务资源(准隔离性)

Confirm: 确认执行业务

  1. 真正执行业务


  2. 不作任何业务检查


  3. 只使用Try阶段预留的业务资源


  4. Confirm操作满足幂等性

Cancel: 取消执行业务

  1. 释放Try阶段预留的业务资源


  2. Cancel操作满足幂等性

本文我会讲解一个实现tcc思想的框架,tcc-transaction在github还是比较火的,并且应该也有公司生产使用,熟悉它的源码,一方面可以提升自己眼界,扩宽自己编码能力,另一方面,以后需要使用它的时候,难免有坑需要修改,也能为开源贡献一份力量

tcc-transaction使用

在tcc-transaction的tcc-transaction-tutorial-sample模块,做相关配置运行即可

tcc-transaction原理

这边我们主要讲解tcc-transaction和dubbo的整合 tcc-transaction的主要原理是Aop,那么以后面试的时候,问用Aop做什么,就可以回答这个了,再把tcc讲一下,完美的连招 作为一个处理分布式事务的框架,先来讲下tcc-transaction的事务抽象

Transaction

tcc的事务,并不是数据库的事务,而是应用层的事务,所以tcc这三个阶段的操作,全部都需要我们手动实现 先看下Transaction对象的参数

  1.    private TransactionXid xid;


  2.    private TransactionStatus status;


  3.    private TransactionType transactionType;


  4.    private volatile int retriedCount = 0;


  5.    private Date createTime = new Date();


  6.    private Date lastUpdateTime = new Date();


  7.    private long version = 1;


  8.    private List<Participant> participants = new ArrayList<Participant>();


  9.    private Map<String, Object> attachments = new ConcurrentHashMap<String, Object>();


参数解释
xid全局事务id,内容相当于uuid,用来保证事务唯一性
status事务的状态,可以为TRYING,CONFIRMING,CANCELLING,分别对应tcc三个阶段
transactionType事务类型,可以为ROOT,BRANCH,ROOT是主事务,BRANCH是分支事务,事务的发起方法对应主事务,其他被调用的dubbo接口在分支事务上
retriedCount事务重试次数,当confirm或者cancel失败重试时增加
createTime事务的创建时间
lastUpdateTime事务最后一次更新时间
version事务的版本号,乐观锁?
participants事务的参与者
attachments附加参数,暂时没发现被用到

再看下Transaction中两个比较重要的方法

  1. public void commit() {


  2.    for (Participant participant : participants) {

  3.        participant.commit();

  4.    }

  5. }


  6. public void rollback() {

  7.    for (Participant participant : participants) {

  8.        participant.rollback();

  9.    }

  10. }

执行Transaction的commit或rollback方法,会对应执行所有participant的commit或rollback方法

下面讲解Participant抽象

Participant

Participant是对tcc事务参与者的抽象

  1. public class Participant implements Serializable {


  2.    private static final long serialVersionUID = 4127729421281425247L;


  3.    private TransactionXid xid;


  4.    private InvocationContext confirmInvocationContext;


  5.    private InvocationContext cancelInvocationContext;


  6.    private Terminator terminator = new Terminator();


  7.    Class<? extends TransactionContextEditor> transactionContextEditorClass;


  8.    ......

  9. }

在Participant中使用InvocationContext把事务参与者的confirm和cancel方法的元数据保存下来

  1. public class InvocationContext implements Serializable {


  2.    private static final long serialVersionUID = -7969140711432461165L;

  3.    private Class targetClass;


  4.    private String methodName;


  5.    private Class[] parameterTypes;


  6.    private Object[] args;

  7. }

元数据包括目标Class,目标方法名,目标参数列表,实际参数列表

Participant也保存了transactionContextEditorClass,用于提取事务上下文

接下来看Participant的commit和rollback方法

  1. public void rollback() {

  2.        terminator.invoke(new TransactionContext(xid, TransactionStatus.CANCELLING.getId()), cancelInvocationContext, transactionContextEditorClass);

  3.    }


  4.    public void commit() {

  5.        terminator.invoke(new TransactionContext(xid, TransactionStatus.CONFIRMING.getId()), confirmInvocationContext, transactionContextEditorClass);

  6.    }

会通过terminator执行具体的操作,传入构造好的TransactionContext,confirmInvocationContext和transactionContextEditorClass

  1. public Object invoke(TransactionContext transactionContext, InvocationContext invocationContext, Class<? extends TransactionContextEditor> transactionContextEditorClass) {



  2.        if (StringUtils.isNotEmpty(invocationContext.getMethodName())) {


  3.            try {

  4.                //从spring容器中获取对应事务参与者实现

  5.                Object target = FactoryBuilder.factoryOf(invocationContext.getTargetClass()).getInstance();


  6.                Method method = null;

  7.                //获取对应方法反射对象

  8.                method = target.getClass().getMethod(invocationContext.getMethodName(), invocationContext.getParameterTypes());

  9.                //设置上下文

  10.                FactoryBuilder.factoryOf(transactionContextEditorClass).getInstance().set(transactionContext, target, method, invocationContext.getArgs());

  11.                //反射调用

  12.                return method.invoke(target, invocationContext.getArgs());


  13.            } catch (Exception e) {

  14.                throw new SystemException(e);

  15.            }

  16.        }

  17.        return null;

  18.    }

在terminator的调用过程中,我们需要理解的一点的是,Participant其实分两种,第一种是本地的Participant,直接反射调用即可,第二种是远程的Participant,也就是调用的是Dubbo接口,反射调用的同时会执行远程对等端的接口逻辑,所以这里需要用到transactionContextEditorClass来设置上下文信息,传递到远程dubbo接口中去

TransactionContext

TransactionContext的保存了全局事务id和事务状态,在调用事务参与者Participant的confirm或cancel方法时会传递过去

TransactionContextEditor

  1. public interface TransactionContextEditor {


  2.    public TransactionContext get(Object target, Method method, Object[] args);


  3.    public void set(TransactionContext transactionContext, Object target, Method method, Object[] args);


  4. }

TransactionContextEditor用于调用事务参与者方法时,设置和获取需要传递的TransactionContext,目前有2种实现,DefaultTransactionContextEditor和DubboTransactionContextEditor DefaultTransactionContextEditor从方法参数里面提取TransactionContext对象

  1. @Override

  2.        public TransactionContext get(Object target, Method method, Object[] args) {

  3.            //获取TransactionContext在args中的位置

  4.            int position = getTransactionContextParamPosition(method.getParameterTypes());


  5.            if (position >= 0) {

  6.                return (TransactionContext) args[position];

  7.            }


  8.            return null;

  9.        }

DubboTransactionContextEditor从Dubbo Rpc上下文提取TransactionContext对象

  1. @Override

  2.    public TransactionContext get(Object target, Method method, Object[] args) {

  3.        //从Dubbo Rpc上下文获取

  4.        String context = RpcContext.getContext().getAttachment(TransactionContextConstants.TRANSACTION_CONTEXT);


  5.        if (StringUtils.isNotEmpty(context)) {

  6.            return JSON.parseObject(context, TransactionContext.class);

  7.        }


  8.        return null;

  9.    }

TransactionManager

TransactionManager用来控制Transaction的生命周期,Transaction的改变使用TransactionRepository同步到第三方存储,一般使用mysql数据库存储 TransactionManager包含的方法以及属性如下

变量介绍

1.transactionRepository

  1. private TransactionRepository transactionRepository;

TransactionRepository用于对Transaction的持久化操作,如果是JDBC实现,其实就是对一张Transaction表的CRUD,这张表主要用于补偿任务

2.CURRENT

  1. private static final ThreadLocal<Deque<Transaction>> CURRENT = new ThreadLocal<Deque<Transaction>>();

这是一个双向队列,在这个类主要用作栈,用来处理事务的嵌套,因为是ThreadLocal,所以针对每个线程都是独立的

3.executorService

  1. private ExecutorService executorService;

线程池,用于异步执行commit或者cancel

方法介绍

1.registerTransaction

  1. private void registerTransaction(Transaction transaction) {


  2.    if (CURRENT.get() == null) {

  3.        CURRENT.set(new LinkedList<Transaction>());

  4.    }


  5.    CURRENT.get().push(transaction);

  6. }

把transaction设置到ThreadLocal对象中去,push方法对应入栈

2.begin

  1. public Transaction begin() {


  2.    Transaction transaction = new Transaction(TransactionType.ROOT);

  3.    transactionRepository.create(transaction);

  4.    registerTransaction(transaction);

  5.    return transaction;

  6. }

开启事务,同步到repository,注册到ThreadLocal,这个方法与用于主事务的创建

3.propagationNewBegin

  1. public Transaction propagationNewBegin(TransactionContext transactionContext) {


  2.    Transaction transaction = new Transaction(transactionContext);

  3.    transactionRepository.create(transaction);


  4.    registerTransaction(transaction);

  5.    return transaction;

  6. }

这个方法用于从主事务的上下文创建分支事务,xid保持不变,事务类型变化

4.propagationExistBegin

  1. public Transaction propagationExistBegin(TransactionContext transactionContext) throws NoExistedTransactionException {

  2.    Transaction transaction = transactionRepository.findByXid(transactionContext.getXid());


  3.    if (transaction != null) {

  4.        transaction.changeStatus(TransactionStatus.valueOf(transactionContext.getStatus()));

  5.        registerTransaction(transaction);

  6.        return transaction;

  7.    } else {

  8.        throw new NoExistedTransactionException();

  9.    }

  10. }

这个方法用于从事务上下文同步事务状态到ThreadLocal

5.commit

  1. public void commit(boolean asyncCommit) {

  2.    //从threadlocal得到当前事务

  3.    final Transaction transaction = getCurrentTransaction();


  4.    transaction.changeStatus(TransactionStatus.CONFIRMING);

  5.    //数据库更新transaction

  6.    transactionRepository.update(transaction);


  7.    if (asyncCommit) {

  8.        try {

  9.            Long statTime = System.currentTimeMillis();

  10.            //通过线程池异步执行事务提交

  11.            executorService.submit(new Runnable() {

  12.                @Override

  13.                public void run() {

  14.                    commitTransaction(transaction);

  15.                }

  16.            });

  17.            logger.debug("async submit cost time:" + (System.currentTimeMillis() - statTime));

  18.        } catch (Throwable commitException) {

  19.            logger.warn("compensable transaction async submit confirm failed, recovery job will try to confirm later.", commitException);

  20.            throw new ConfirmingException(commitException);

  21.        }

  22.    } else {

  23.        //同步执行事务提交

  24.        commitTransaction(transaction);

  25.    }

  26. }

这个方法执行事务的commit,实际事务提交在commitTransaction中执行,会执行每个事务参与者的commit方法

  1. private void commitTransaction(Transaction transaction) {

  2.    try {

  3.        //调用事务参与者的提交方法

  4.        transaction.commit();

  5.        //事务结束,在数据库删除当前事务,如果commit异常,不会把数据库内事务记录删除,为了重试补偿

  6.        transactionRepository.delete(transaction);

  7.    } catch (Throwable commitException) {

  8.        logger.warn("compensable transaction confirm failed, recovery job will try to confirm later.", commitException);

  9.        throw new ConfirmingException(commitException);

  10.    }

  11. }

看了这段逻辑后,我们可以发现,在commit失败的时候,并不会触发rollback,而是不删除数据库事务记录,之后会有定时任务进行扫描重试,后面会讲到这个定时任务

6.rollback

  1. public void rollback(boolean asyncRollback) {


  2.    final Transaction transaction = getCurrentTransaction();

  3.    transaction.changeStatus(TransactionStatus.CANCELLING);


  4.    transactionRepository.update(transaction);


  5.    if (asyncRollback) {


  6.        try {

  7.            executorService.submit(new Runnable() {

  8.                @Override

  9.                public void run() {

  10.                    rollbackTransaction(transaction);

  11.                }

  12.            });

  13.        } catch (Throwable rollbackException) {

  14.            logger.warn("compensable transaction async rollback failed, recovery job will try to rollback later.", rollbackException);

  15.            throw new CancellingException(rollbackException);

  16.        }

  17.    } else {


  18.        rollbackTransaction(transaction);

  19.    }

  20. }

这个方法用于执行事务的回滚逻辑,和commit方法类似,在rollbackTransaction方法中,会执行每个事务参与者的rollback方法

  1. private void rollbackTransaction(Transaction transaction) {

  2.    try {

  3.        //调用事务参与者的提交方法

  4.        transaction.rollback();

  5.         //事务结束,在数据库删除当前事务,如果rollback异常,不会把数据库内事务记录删除,为了重试补偿

  6.        transactionRepository.delete(transaction);

  7.    } catch (Throwable rollbackException) {

  8.        logger.warn("compensable transaction rollback failed, recovery job will try to rollback later.", rollbackException);

  9.        throw new CancellingException(rollbackException);

  10.    }

  11. }

7.cleanAfterCompletion

  1. public void cleanAfterCompletion(Transaction transaction) {

  2.    if (isTransactionActive() && transaction != null) {

  3.        Transaction currentTransaction = getCurrentTransaction();

  4.        if (currentTransaction == transaction) {

  5.            //栈操作,后进先出

  6.            CURRENT.get().pop();

  7.        } else {

  8.            throw new SystemException("Illegal transaction when clean after completion");

  9.        }

  10.    }

  11. }

事务结束,从栈中弹出结束的事务。

8.enlistParticipant

  1. public void enlistParticipant(Participant participant) {

  2.        Transaction transaction = this.getCurrentTransaction();

  3.        transaction.enlistParticipant(participant);

  4.        transactionRepository.update(transaction);

  5.    }

给事务绑定事务参与者并同步到repository

接下来讲下核心的两个切面,这两个切面把上面的所有组件串联在一起使用

核心Aspect

在使用tcc-transaction的时候,我们需要对开启tcc事务的方法加上@Compensable注解,这个注解可以设置以下参数

参数解释
propagation事务传播性,包含REQUIRED(必须存在事务,不存在,创建),SUPPORTS(有事务的话在事务内运行),MANDATORY(必须存在事务),REQUIRES_NEW(不管是否存在,创建新的事务)
confirmMethodconfirm阶段对应的方法
cancelMethodcancel阶段对应的方法
transactionContextEditor设置对应transactionContextEditor
asyncConfirm是否异步confirm
asyncCancel是否异步cancel

@Compensable注解的参数会在下面两个切面中使用到

ConfigurableTransactionAspect

ConfigurableTransactionAspect主要用来控制Transaction的生命周期,内部通过CompensableTransactionInterceptor实现

  1. @Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")

  2. public void compensableService() {


  3. }


  4. @Around("compensableService()")

  5. public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {


  6.    return compensableTransactionInterceptor.interceptCompensableMethod(pjp);

  7. }

直接看interceptCompensableMethod方法

  1. public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {


  2.    //解析@Compensable注解

  3.    Method method = CompensableMethodUtils.getCompensableMethod(pjp);

  4.    Compensable compensable = method.getAnnotation(Compensable.class);

  5.    Propagation propagation = compensable.propagation();

  6.    //获取上下文,如果是Root,不会存在上下文,Transaction都还没创建

  7.    TransactionContext transactionContext = FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs());

  8.    boolean asyncConfirm = compensable.asyncConfirm();

  9.    boolean asyncCancel = compensable.asyncCancel();

  10.    boolean isTransactionActive = transactionManager.isTransactionActive();


  11.    if (!TransactionUtils.isLegalTransactionContext(isTransactionActive, propagation, transactionContext)) {

  12.        throw new SystemException("no active compensable transaction while propagation is mandatory for method " + method.getName());

  13.    }


  14.    //计算方法类型,Root对应主事务入口方法,Provider对应远程提供者方的方法,Normal是主事务内消费者方的方法(是代理方法)

  15.    MethodType methodType = CompensableMethodUtils.calculateMethodType(propagation, isTransactionActive, transactionContext);


  16.    switch (methodType) {

  17.        case ROOT:

  18.            //处理主事务切面

  19.            return rootMethodProceed(pjp, asyncConfirm, asyncCancel);

  20.        case PROVIDER:

  21.            //处理提供者事务切面

  22.            return providerMethodProceed(pjp, transactionContext, asyncConfirm, asyncCancel);

  23.        default:

  24.            //消费者事务直接执行,会对应执行远端提供者事务切面

  25.            return pjp.proceed();

  26.    }

  27. }

在tcc事务内被@Compensable注解的方法分三种

  1. Root方法,就是这次事务的入口方法

  2. Normal方法,在Root方法调用的dubbo接口方法

  3. Provider方法,对应dubbo接口方法的远程实现 被注解的方法都是try的逻辑,confirm和cancel逻辑配置在@Compensable注解参数中

对被@Compensable注解的方法执行切面逻辑的时候,会根据这三种方法类型做不同处理 对于Root方法,执行rootMethodProceed的逻辑

  1. private Object rootMethodProceed(ProceedingJoinPoint pjp, boolean asyncConfirm, boolean asyncCancel) throws Throwable {


  2.    Object returnValue = null;


  3.    Transaction transaction = null;


  4.    try {


  5.        //创建事务

  6.        transaction = transactionManager.begin();


  7.        try {

  8.            returnValue = pjp.proceed();

  9.        } catch (Throwable tryingException) {


  10.            if (isDelayCancelException(tryingException)) {

  11.                transactionManager.syncTransaction();

  12.            } else {

  13.                logger.warn(String.format("compensable transaction trying failed. transaction content:%s", JSON.toJSONString(transaction)), tryingException);


  14.                //回滚事务

  15.                transactionManager.rollback(asyncCancel);

  16.            }


  17.            throw tryingException;

  18.        }


  19.        //提交事务

  20.        transactionManager.commit(asyncConfirm);


  21.    } finally {

  22.        //清理操作

  23.        transactionManager.cleanAfterCompletion(transaction);

  24.    }


  25.    return returnValue;

  26. }

会在被切方法(对应try逻辑)执行前,开启事务,try逻辑执行成功,通过transactionManager的commit方法执行每个事务参与者的commit逻辑,如果try失败,通过transactionManager执行每个参与者的rollback逻辑。

对于Provider方法

  1. private Object providerMethodProceed(ProceedingJoinPoint pjp, TransactionContext transactionContext, boolean asyncConfirm, boolean asyncCancel) throws Throwable {


  2.    Transaction transaction = null;

  3.    try {


  4.        switch (TransactionStatus.valueOf(transactionContext.getStatus())) {

  5.            case TRYING:

  6.                //使用transactionContext创建分支事务

  7.                transaction = transactionManager.propagationNewBegin(transactionContext);

  8.                //执行被切方法逻辑

  9.                return pjp.proceed();

  10.            case CONFIRMING:

  11.                try {

  12.                    //更新事务状态

  13.                    transaction = transactionManager.propagationExistBegin(transactionContext);

  14.                    //提交事务,不走切面的方法

  15.                    transactionManager.commit(asyncConfirm);

  16.                } catch (NoExistedTransactionException excepton) {

  17.                    //the transaction has been commit,ignore it.

  18.                }

  19.                break;

  20.            case CANCELLING:


  21.                try {

  22.                    //更新事务状态

  23.                    transaction = transactionManager.propagationExistBegin(transactionContext);

  24.                    //回滚事务,不走切面的方法

  25.                    transactionManager.rollback(asyncCancel);

  26.                } catch (NoExistedTransactionException exception) {

  27.                    //the transaction has been rollback,ignore it.

  28.                }

  29.                break;

  30.        }


  31.    } finally {

  32.        //清理资源

  33.        transactionManager.cleanAfterCompletion(transaction);

  34.    }


  35.    Method method = ((MethodSignature) (pjp.getSignature())).getMethod();


  36.    //对于 confirm和 cancel 返回空值

  37.    //主要针对原始类型做处理,因为不能为null

  38.    return ReflectionUtils.getNullValue(method.getReturnType());

  39. }

可以看到在provider类型方法的切面,也就是远程的Participant,如果transaction的status为trying,会通过transactionManager.propagationNewBegin创建分支事务并执行被切方法逻辑,如果是status为confirming或者canceling,会调用对应confirm或cancel配置的方法,跳过被切方法

对于normal类型 直接调用,normal类型的方法是封装了对远程dubbo接口方法调用逻辑的本地代理方法,所以直接执行即可

ConfigurableCoordinatorAspect

ConfigurableCoordinatorAspect主要是为了设置事务的参与者,在一个事务内,每个被@Compensable注解的方法都是事务参与者

  1. @Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")

  2. public void transactionContextCall() {


  3. }


  4. @Around("transactionContextCall()")

  5. public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {

  6.    return resourceCoordinatorInterceptor.interceptTransactionContextMethod(pjp);

  7. }

相关逻辑封装在ResourceCoordinatorInterceptor的interceptTransactionContextMethod方法中

  1. public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {


  2.    //得到当前事务

  3.    Transaction transaction = transactionManager.getCurrentTransaction();


  4.    if (transaction != null) {


  5.        switch (transaction.getStatus()) {

  6.            case TRYING:

  7.                //只需要在trying的时候把参与者信息提取出来,设置到transaction中去

  8.                enlistParticipant(pjp);

  9.                break;

  10.            case CONFIRMING:

  11.                break;

  12.            case CANCELLING:

  13.                break;

  14.        }

  15.    }


  16.    //执行目标方法

  17.    return pjp.proceed(pjp.getArgs());

  18. }

在trying阶段会把所有参与者加入到事务中去,对于Root方法,创建主事务,加入的参与者会包括Root方法对应本地参与者以及Normal方法对应的远程参与者,对于Provider方法,通过主事务上下文创建分支事务,加入的参与者包括Provider方法对应的本地参与者以及它包含的Normal方法对应的远程参与者。这里的远程参与者又可以开启新的分支事务。层级多了,势必会产生性能问题。

接下来看enlistParticipant如何生成参与者对象

  1. private void enlistParticipant(ProceedingJoinPoint pjp) throws IllegalAccessException, InstantiationException {


  2.    //获取@Compensable信息

  3.    Method method = CompensableMethodUtils.getCompensableMethod(pjp);

  4.    if (method == null) {

  5.        throw new RuntimeException(String.format("join point not found method, point is : %s", pjp.getSignature().getName()));

  6.    }

  7.    Compensable compensable = method.getAnnotation(Compensable.class);


  8.    String confirmMethodName = compensable.confirmMethod();

  9.    String cancelMethodName = compensable.cancelMethod();


  10.    Transaction transaction = transactionManager.getCurrentTransaction();

  11.    TransactionXid xid = new TransactionXid(transaction.getXid().getGlobalTransactionId());


  12.    if (FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs()) == null) {

  13.        //设置事务上下文到Editor,Editor用来统一提取上下文,这边对应设置dubbo的rpc上下文中去

  14.        //这边的上下文设置后就会调用try逻辑

  15.        FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().set(new TransactionContext(xid, TransactionStatus.TRYING.getId()), pjp.getTarget(), ((MethodSignature) pjp.getSignature()).getMethod(), pjp.getArgs());

  16.    }


  17.    Class targetClass = ReflectionUtils.getDeclaringType(pjp.getTarget().getClass(), method.getName(), method.getParameterTypes());


  18.    //目前的用法,其实只要保存调用参数就行,因为具体执行confirm和cancel都是根据transaction的status来判断的

  19.    //confirm的调用上下文

  20.    InvocationContext confirmInvocation = new InvocationContext(targetClass,

  21.            confirmMethodName,

  22.            method.getParameterTypes(), pjp.getArgs());


  23.    //cancel的调用上下文

  24.    InvocationContext cancelInvocation = new InvocationContext(targetClass,

  25.            cancelMethodName,

  26.            method.getParameterTypes(), pjp.getArgs());


  27.    Participant participant =

  28.            new Participant(

  29.                    xid,

  30.                    confirmInvocation,

  31.                    cancelInvocation,

  32.                    compensable.transactionContextEditor());


  33.    //把participant设置到transaction,并且同步到持久化存储

  34.    transactionManager.enlistParticipant(participant);


  35. }

通过从@Compensable注解配置的信息以及当前Transaction来配置Participant。 在Participant设置到Transaction后,会执行pjp.proceed(pjp.getArgs()),也就执行了对应try逻辑的被切方法

ConfigurableCoordinatorAspect的逻辑会在ConfigurableTransactionAspect后执行,这和它们设置的order有关,小的order先执行,后切入

失败补偿机制

对于失败的Confirm和Cancel操作,会有补偿任务进行重试,具体实现类为RecoverScheduledJob,在这个类的init方法会启动quartz任务

  1. public void init() {


  2.    try {

  3.        MethodInvokingJobDetailFactoryBean jobDetail = new MethodInvokingJobDetailFactoryBean();

  4.        //设置定时任务执行的对象和方法

  5.        jobDetail.setTargetObject(transactionRecovery);

  6.        jobDetail.setTargetMethod("startRecover");

  7.        jobDetail.setName("transactionRecoveryJob");

  8.        jobDetail.setConcurrent(false);

  9.        jobDetail.afterPropertiesSet();


  10.        CronTriggerFactoryBean cronTrigger = new CronTriggerFactoryBean();

  11.        cronTrigger.setBeanName("transactionRecoveryCronTrigger");

  12.        //设置cron表达式

  13.        cronTrigger.setCronExpression(transactionConfigurator.getRecoverConfig().getCronExpression());

  14.        cronTrigger.setJobDetail(jobDetail.getObject());

  15.        cronTrigger.afterPropertiesSet();


  16.        scheduler.scheduleJob(jobDetail.getObject(), cronTrigger.getObject());

  17.        scheduler.start();


  18.    } catch (Exception e) {

  19.        throw new SystemException(e);

  20.    }

  21. }

在这个方法里会使用RecoverConfig的配置初始化定时任务,定时任务具体的执行逻辑使用MethodInvokingJobDetailFactoryBean的targetObject和targetMethod配置,对应为transactionRecovery的startRecover方法,我们来看下这个方法

  1. public void startRecover() {

  2.        //获取所有没被处理的transaction

  3.        List<Transaction> transactions = loadErrorTransactions();

  4.        //根据规则处理这些transaction

  5.        recoverErrorTransactions(transactions);

  6.    }

分别看下上述两个方法的逻辑

  1. private List<Transaction> loadErrorTransactions() {



  2.        long currentTimeInMillis = Calendar.getInstance().getTimeInMillis();


  3.        TransactionRepository transactionRepository = transactionConfigurator.getTransactionRepository();

  4.        RecoverConfig recoverConfig = transactionConfigurator.getRecoverConfig();


  5.        //获取在RecoverDuration间隔之前未完成的transaction

  6.        return transactionRepository.findAllUnmodifiedSince(new Date(currentTimeInMillis - recoverConfig.getRecoverDuration() * 1000));

  7.    }

  1. private void recoverErrorTransactions(List<Transaction> transactions) {



  2.    for (Transaction transaction : transactions) {


  3.        //重试次数超过上限的Transaction不再执行补偿

  4.        if (transaction.getRetriedCount() > transactionConfigurator.getRecoverConfig().getMaxRetryCount()) {


  5.            logger.error(String.format("recover failed with max retry count,will not try again. txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)));

  6.            continue;

  7.        }


  8.        //如果是分支事务,并且超过最长超时时间忽略

  9.        if (transaction.getTransactionType().equals(TransactionType.BRANCH)

  10.                && (transaction.getCreateTime().getTime() +

  11.                transactionConfigurator.getRecoverConfig().getMaxRetryCount() *

  12.                        transactionConfigurator.getRecoverConfig().getRecoverDuration() * 1000

  13.                > System.currentTimeMillis())) {

  14.            continue;

  15.        }


  16.        try {

  17.            transaction.addRetriedCount();


  18.            //对超时的confiring操作重试

  19.            if (transaction.getStatus().equals(TransactionStatus.CONFIRMING)) {


  20.                transaction.changeStatus(TransactionStatus.CONFIRMING);

  21.                transactionConfigurator.getTransactionRepository().update(transaction);

  22.                transaction.commit();

  23.                transactionConfigurator.getTransactionRepository().delete(transaction);


  24.            } else if (transaction.getStatus().equals(TransactionStatus.CANCELLING)//对超时的Canceling操作重试,或者Root超时的trying进行cancel操作

  25.                    || transaction.getTransactionType().equals(TransactionType.ROOT)) {


  26.                transaction.changeStatus(TransactionStatus.CANCELLING);

  27.                transactionConfigurator.getTransactionRepository().update(transaction);

  28.                transaction.rollback();

  29.                transactionConfigurator.getTransactionRepository().delete(transaction);

  30.            }


  31.        } catch (Throwable throwable) {


  32.            if (throwable instanceof OptimisticLockException

  33.                    || ExceptionUtils.getRootCause(throwable) instanceof OptimisticLockException) {

  34.                logger.warn(String.format("optimisticLockException happened while recover. txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)), throwable);

  35.            } else {

  36.                logger.error(String.format("recover failed, txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)), throwable);

  37.            }

  38.        }

  39.    }

  40. }

注意一点,trying阶段不会重试,失败未处理,会触发canceling操作

思考

分布式事务解决方案

下面列举一些分布式事务解决方案的特性

  1. 传统的二/三阶段提交 这种解决方案会占用数据库事务资源,在互联网公司很少使用

  2. 异步确保型事务  基于可靠消息的最终一致性,可以异步,但数据绝对不能丢,而且一定要记账成功 这个难道是依赖mq支持的事务特性?

  3. 最大努力通知型事务  按规律进行通知,不保证数据一定能通知成功,但会提供可查询操作接口进行核对 目前项目比较常用的方式

  4. tcc 适用于实时性要求比较高,数据必须可靠的场景

我的理解

依照我目前的工作经验,现在公司对分布式事务的解决方案一般是上述的第三种方法,但是一些对实时性要求比较高,数据必须可靠的场景我们还是可以考虑使用tcc的,但是也没必要全盘tcc,可以和最大努力通知型事务一起使用

对于tcc还有一个疑问,在高并发情况下,在mq的模式下,由于是异步,能够保证消息最终被消费掉,并且消费速率稳定,而tcc这种模式,会不会导致接口资源不够用,接口资源都占用满,导致不断的try失败。

由此可见tcc的使用难度不止在业务使用方式上,对于一些极限的场景,需要有经验的人来分析tcc该使用在多大范围内。但是如果是并发量不大的项目,大家可以试着使用。


文章转载自盛超杰的笔记,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论