暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

ByteTCC源码阅读

风火说 2020-05-14
444

ByteTCC源码阅读


  • 名词定义

  • 重点类关注

    • supports.CompensableInvocationImpl

    • org.bytesoft.compensable.archive.CompensableArchive

  • TCC事务流程解析

    • 构建TCC调用实例

    • 事务管理器开启事务

    • 监听资源入列到本地事务

    • 调用另外一个TCC事方法

    • 业务方法结束

  • 故障恢复

  • 清除日志

  • 事务日志

  • 远端调用拦截

    • 发起方

    • 接收方

  • 恢复相关

    • 从日志恢复对应的TransactionArchive对象

    • 从TransactionArchive中执行恢复动作

  • CleanupWork

  • CleanupFile

    • 清理

    • 压缩

  • 文件格式

    • CleanUp文件格式

    • 补偿日志格式

  • 设计思想

  • 细节相关

    • XID相关


名词定义

本地事务

JDBC单机事务

TCC调用

一个方法如果注解了Compensable
,则意味着声明该方法是一个TCC调用。一个TCC调用不仅仅只是当前方法,还意味着与之关联的确认方法和取消方法,也就是同一个接口的三个不同含义的实现。

TCC调用需要在TCC事务的范围内起作用。一个TCC事务内可以多于一个的TCC调用。

TCC事务

与本地事务类似,一个TCC事务用于保证在该事务范围内的操作,具备数据一致。但是TCC事务并不是由数据库提供的特性,而是一系列的辅助逻辑来达成的数据一致性保证。TCC事务包含两个阶段:尝试阶段(try),完成阶段(complete)。对于参与到TCC事务中的每一个TCC调用,都有这两个阶段。

TCC事务本质上是协调参与TCC事务中的各方的本地事务来达成整体的数据一致性。

尝试阶段

尝试阶段指的是执行业务逻辑的阶段。需要注意,尝试阶段需要在一个本地事务的作用域下。同时,尝试阶段中可能会出现对远端服务的调用,此时该远端服务的调用就纳入了TCC事务的作用范围。

完成阶段

当尝试阶段执行完毕后,接着执行完成阶段。完成阶段有两个独立的分支:确认(confirm)和取消(cancel)。如果尝试阶段的代码执行正确,没有抛出被框架感知的异常(也就是业务代码自己catch住了异常的也算是正常执行完毕尝试阶段),则执行确认分支;反之则执行取消分支。

确认分支和取消分支的执行逻辑是大体相似的。都是遍历参与到TCC事务中的TCC调用对应的确认方法和取消方法,并且依靠Bean容器去执行这些方法。如果都能执行完毕,则将事务的数据从系统中清除;如果出现执行异常的情况,则将TCC事务标记为异常事务,放入事务仓库。

后台的定时恢复任务会从事务仓库中获取异常事务,并且尝试重新发起完成阶段,直到完成阶段成功或者该事务最终超过恢复时限。

需要注意的是,完成阶段中对确认方法或者取消方法的执行,也需要在本地事务的作用域下。

恢复日志

为了避免宕机导致的事务数据丢失,TCC事务的中状态变化,归档的增减,状态变化,远端资源的增减和状态变化都会写入日志。

有了恢复日志,在系统宕机重启后就可以从恢复日志中将TCC事务信息还原回来。

清除日志

TCC事务为了保证数据的一致性,需要依靠将xid信息和本地业务放在一个本地事务中一起提交。当TCC事务完成时,就需要清除对应的xid信息,于是将这个信息写入到清除日志。由定时线程定时读取这个日志,并且将对应的xid信息从表中删除。

重点类关注

supports.CompensableInvocationImpl

TCC调用对象,包含以下属性

    private String declaringClass;
    //TCC调用的方法名
    private String methodName;
    //TCC调用的方法的参数类型的字符串表达数组
    private String[] parameterTypeArray;
    //TCC调用的method对象
    private transient Method method;
    //本次TCC调用的入参
    private Object[] args;
    //TCC调用对应的确认分支的方法的类在容器中的Bean ID
    private String confirmableKey;
    //TCC调用对应的取消分支的方法的类在容器中的Bean ID
    private String cancellableKey;
    //该TCC调用方法所在的类在容器中的Bean ID
    private Object identifier;
    //是否简单类型的TCC调用
    private boolean simplified;
    //该TCC调用是否注册到TCC事务中
    private boolean enlisted;

通过补偿方法拦截器,每次调用补偿方法时,就会生成该对象,并且将其注册到当前的TCC事务中。

org.bytesoft.compensable.archive.CompensableArchive

TCC归档对象。包含以下属性

    //标识符,其中的全局ID部分和当前TCC事务的xid中的全局ID相同
    private Xid identifier;
    //未发现作用
    private boolean coordinator;
    //该归档信息对应的TCC调用对象
    private CompensableInvocation compensable;
 
    /* try-phase. */
    //作用于该TCC调用尝试阶段的本地事务使用的数据源在Bean容器中的Bean ID
    private String transactionResourceKey;
    //作用于该TCC调用尝试阶段的本地事务的xid,该xid是有分支标识符的
    private Xid transactionXid;
    //该TCC调用的尝试阶段是否在本地事务的保护下完成
    private boolean tried;
 
    /* confirm/cancel phase. */
    //作用于该TCC调用完成阶段的本地事务使用的数据源在Bean容器中的Bean ID。
    private String compensableResourceKey;
    //作用于该TCC调用完成阶段的本地事务使用的xid。该xid是有分支标识符的。
    private Xid compensableXid;
    //该TCC调用完成阶段中的confirm分支是否执行完毕
    private boolean confirmed;
    //该TCC调用完成阶段中的cancel分支是否执行完毕
    private boolean cancelled;

TCC事务流程解析

一次完整的TCC事务调用会经历很多个目的不同的阶段,下面按照每个阶段的不同进行分析。

构建TCC调用实例

方法或者类级别注解了org.bytesoft.compensable.Compensable
,使得该方法会被首先被拦截器CompensableMethodInterceptor
识别并且拦截,此时会进入到拦截方法CompensableMethodInterceptor#invoke(org.aspectj.lang.ProceedingJoinPoint)

因为注解是可以注解在类上的,所以拦截器需要判断当前方法是否是一个TCC调用。如果不是的话,则直接放行。

如果判断当前方法是一个tcc操作的接口方法时,创建一个supports.CompensableInvocationImpl
实例,并且将该实例压入与当前线程绑定的堆栈内。方便后续的程序中当前线程获取该信息。使用堆栈,是因为一个TCC调用内可以嵌套另外的TCC调用。

当方法执行完毕后,该CompensableInvocationImpl
实例会被弹出堆栈。

事务管理器开启事务

CompensableMethodInterceptor
最先发挥拦截作用,然后紧接着是Spring的事务拦截器org.springframework.transaction.interceptor.TransactionInterceptor#invoke
。事务拦截器判断事务注解的存在,通过JTA的javax.transaction.UserTransaction
的begin方法,进行事务的开启。在ByteTCC中该实现类为UserCompensableImpl
。该begin方法实际上是委托给TransactionManagerImpl#begin
的。

方法TransactionManagerImpl#begin
被调用,执行事务开启。当前线程绑定的堆栈中存在TCC调用实例,因此该方法是一个TCC的try方法,将后续操作委托给方法org.bytesoft.compensable.CompensableManager#compensableBegin
,通过该方法进行TCC事务的开启工作。该方法主要有几个步骤:

  1. 创建事务上下文对象org.bytesoft.compensable.TransactionContext
    ,并且进行合适的属性设置,包括:是否协调者(是),是否TCC事务(是),事务发起者标识符(TCC协调者在Bean容器中的Bean ID),xid属性(这个xid只有全局ID)。使用该上下文对象初始化TCC事务对象CompensableTransactionImpl
    。将TCC事务对象关联当前线程。
  2. 创建事务上下文对象org.bytesoft.compensable.TransactionContext
    ,设置其xid属性(与步骤一的值相同),使用该上下文对象启动方法CompensableManagerImpl#invokeBegin
  3. 将TCC事务放入事务仓库,以xid为key。
  4. 对TCC事务生成事务归档对象org.bytesoft.compensable.archive.TransactionArchive
    ,将该对象序列化后写入恢复日志。

着重看下步骤二,该方法的调用主要有以下步骤:

  1. 获取本地事务的参与者(也就是接口org.bytesoft.transaction.TransactionParticipant
    的实现类),调用其org.bytesoft.transaction.TransactionParticipant#start
    方法,将传入的事务上下文对象作为参数传入。这个方法会返回一个org.bytesoft.transaction.Transaction
    对象(这个接口继承了JTA中的事务接口)。这个步骤的作用就是开启本地事务。
  2. 将本地事务和TCC事务各自设置为彼此的额外信息,主要作用是后续的方法调用需要彼此能够获取到对应的信息。
  3. 将当前的TCC调用注册到TCC事务中。
    1. 设置该TCC调用对象的enlisted属性为true。
    2. 新建TCC归档对象org.bytesoft.compensable.archive.CompensableArchive
      ,设置属性包括:标识符(由事务上下文的全局ID和新的分支标识符构成的xid),tcc调用对象(就是步骤一的tcc调用对象)。
    3. 将TCC归档对象加入到TCC事务的archiveList
      列表中。
    4. 将TCC归档对象加入到xidToArchivesMap
      的value代表的list中。这里的key是本地事务的事务上下文的xid。所以这个步骤就是为了将一个TCC事务中涉及到的不同的本地事务关联的TCC归档对象区分开。

从效果来看,步骤二的目的就是开启了本地事务,并且将当前的TCC调用注册到TCC事务中。

监听资源入列到本地事务

ByteTCC的本地事务依靠ByteJTA完成,其内部也是遵循JTA的套路进行的。因此当业务操作执行到获取数据库连接的相关部分时,其数据源org.bytesoft.bytejta.supports.jdbc.LocalXAResource
在执行getConnection
方法获取连接时,会执行方法javax.transaction.Transaction#enlistResource
,进行资源入列(不过这个XAResource是模拟的,因为并不需要本地连接是XA的)。ByteJTA的实现中,在这里做了通知器,因此会触发通知方法org.bytesoft.transaction.supports.TransactionResourceListener#onEnlistResource
,其实现是CompensableTransactionImpl#onEnlistResource
CompensableTransactionImpl#onEnlistResource
根据当前TCC事务所处阶段执行不同的分支流程。由于当前处于尝试阶段,因此委托方法CompensableTransactionImpl#onInvocationPhaseEnlistResource
继续执行。该方法主要有两个内容:

  1. 遍历与当前本地事务相关联TCC归档信息,并且设置数据源属性。
    1. 将本地事务的xid设置到TCC归档的transactionXid属性。
    2. 将本地事务使用的数据源在Bean容器中的Bean ID设置到TCC归档的transactionResourceKey属性。
    3. 创建一个xid,其中全局ID为新生成的,分支标识符值为本地事务的xid中的全局ID。将这个xid设置到TCC归档的compensableXid属性。
    4. 都设置后,将更新后的TCC归档信息写入到日志。
  2. 新建TransactionBranch对象,设置本地事务的xid为其branchIxd属性,设置本地事务对应的数据源的Bean ID为其resourceKey属性。将该TransactionBranch对象作为value放入xidToBranchMap
    ,key为全新的xid,只包含和本地事务xid相同的全局ID,没有分支标识符。

调用另外一个TCC事方法

TCC方法在执行过程中,也可能会调用另外的TCC方法。此时会遇到两种情况:

  1. 新的TCC方法直接使用当前TCC方法的事务。
  2. 新的TCC方法要求新建新的事务来运行。

使用当前事务

与第一次调用相同,首先被方法CompensableMethodInterceptor#invoke
拦截,执行其内部逻辑。与第一次不同的地方在于,当前线程中已经绑定了本地事务和TCC事务,因此这里会调用org.bytesoft.compensable.CompensableTransaction#registerCompensable
方法将TCC调用注册到TCC事务中。

这里与TCC事务第一次注册TCC调用有所不同。第一次注册时,只是简单的新建TCC归档对象,并且放入列表中,没有设置本地事务相关的信息,本地事务相关的信息是等到程序申请数据库链接时通过通知器的方式设置进来。然而内部调用第二个TCC方法时使用了当前事务,因此就不会再申请新的数据库链接,也就无法通过通知器的方式来进行设置了。

不过每一个在TCC事务期间开启的本地事务都存储了本地事务信息在xidToBranchMap
属性中。此时方法判断TCC事务当前关联的本地事务的全局ID在xidToBranchMap有value,就可以取出对应的TransactionBranch
对象,依靠其中的本地事务信息,对这次新建的TCC归档对象设置其关联的本地事务信息。并且一样也设置了compensableXid后写入日志。

从这里也能的出来,参与到TCC事务的本地事务可能会作用于多于一个的TCC调用。此时这些TCC调用的数据一致性是在一个本地事务的保护下。

当方法执行完毕退出方法时,Spring的事务拦截器会会执行拦截并且判断是否需要事务提交。由于这个方法使用了之前方法的事务,因此此时不会进行本地事务的提交。

使用新事务

如果嵌套调用的补偿方法的传播行为要求使用全新的事务,则逻辑和使用当前事务有很大不同。

事务开启

第一个不同体现在事务的开启阶段。因为传播行为要求启用新的事务进行处理,因此当前事务首先要挂起,Spring会调用事务管理器的TransactionManagerImpl#suspend
方法执行事务挂起。如果当前存在TCC事务的话,则会继续委托给方法CompensableManagerImpl#suspend
进行事务挂起。该方法主要执行了:

  • 调用本地事务的事务管理器(也就是本地事务的JTA管理器),对事务进行挂起
  • TCC事务上下文中的传播层级+1
  • TCC事务中解绑与当前线程绑定的本地事务

当前的事务挂起后,则会调用TransactionManagerImpl#begin
方法开启事务。和之前相比不同,由于当前已经存在TCC事务,因此本次事务开启委托给了方法CompensableManagerImpl#begin

创建一个全新的xid(新的全局ID),使用该xid初始化一个事务上下文,调用方法CompensableManagerImpl#invokeBegin
。内部的流程和第一次TCC方法调用是一样的。

资源入列

资源入列和之前的流程是相同的,也是在程序申请数据库链接的时候通过通知器执行了资源入列。只是需要注意,由于此时本地事务的xid不同,因此TCC事务中的xidToBranchMap
的size是2。

事务提交

由于当前的TCC方法使用了全新的事务,因此在方法退出时会触发事务的提交,Spring的事务管理器委托方法CompensableManagerImpl#commit
进行事务的提交。

由于当前存在TCC事务,并且TCC事务上下文中的传播层级大于0,因此委托方法CompensableManagerImpl#commit
进行事务提交。该方法相比CompensableManagerImpl#compensableCommit
实现比较简单,因为后者是整个TCC事务的提交,前者只是当前本地事务的提交。

该方法最终会委托方法CompensableManagerImpl#invokeTransactionCommitIfNecessary
进行本地事务的提交。invokeTransactionCommitIfNecessary
会判断当前的事务是否被设置会只允许回滚,如果没有的话,继续委托方法CompensableManagerImpl#invokeTransactionCommit
进行事务提交。该方法会委托方法CompensableManagerImpl#invokeTransactionCommitIfLocalTransaction
执行本地事务提交,并且无论成功与否,都会解绑TCC事务和当前线程绑定的本地事务。

当本地事务提交成功时,触发了通知方法CompensableTransactionImpl#onInvocationPhaseCommitSuccess
。由于当前TCC事务的传播层级大于0,因此触发方法CompensableTransactionImpl#onInvocationPhaseParticipantCommitSuccess
。该方法主要是:

  • 以当前提交成功的本地事务的xid从TCC事务的xidToArchivesMap
    中获得参与到该本地事务的CompensableArchive
    列表。
  • 遍历该列表,为每一个元素设置tried
    属性为true,将元素从列表删除,并且将更新的TCC归档对象写入日志。

事务回滚

如果业务方法抛出了异常,Spring的事务管理器会进行回滚,并且委托给方法TransactionManagerImpl#rollback
。该方法判断当前存在TCC事务并且传播层级大于0,于是继续委托给方法CompensableManagerImpl#rollback
执行回滚。这个回滚只是回滚当前线程绑定的本地事务。该方法进行一些状态检查后,继续委托给方法CompensableManagerImpl#invokeTransactionRollback
invokeTransactionRollback
的逻辑主要有:

  1. 获取本地事务参与者对象,对本地事务进行结束和回滚。主要是执行方法org.bytesoft.bytejta.TransactionCoordinator#end(org.bytesoft.transaction.TransactionContext, int)
    org.bytesoft.bytejta.TransactionCoordinator#rollback
  2. TCC事务和当前的本地事务解绑。

事务恢复

在新开的事务提交完成后,Spring的事务管理器则会调用事务管理器的事务恢复接口将之前的事务进行恢复。首先是执行到方法TransactionManagerImpl#resume
。由于当前存在TCC事务,因此委托给方法CompensableManagerImpl#resume
。该方法主要有:

  • 获取本地事务的事务管理器,将入参的事务执行恢复。
  • 从该事务中获取TCC事务对象,将该事务设置为TCC事务的额外信息
  • TCC事务的事务上下文中的传播层级-1。

业务方法结束

在最外围的业务方法完成后,Spring的事务拦截器会进行事务提交,事务的提交被委托给用户事务接口,其实现UserCompensableImpl
则将commit方法进一步委托给TransactionManagerImpl#commit
。该commit逻辑通过上下文发现:当前存在TCC事务,该事务是协调者且传播层级为0,这些条件使得事务管理器将后续操作委托给CompensableManagerImpl#compensableCommit
方法用于进行TCC事务提交。compensableCommit
方法是一个步骤型方法,只是用于明确整个TCC事务提交的具体步骤,包含:

  1. 各种必要的状态检查。
  2. 将TCC事务与当前线程解绑。
  3. 执行TCC事务的提交逻辑。
  4. TCC事务提交成功的情况下,将事务信息写入清除日志。

1,2,4步骤很简单,略过。下面着重看下步骤三,该步骤委托方法CompensableManagerImpl#invokeCompensableCommit
完成。具体逻辑有:

  1. 提交本地事务,无论成功与否,完成后均将本地事务和TCC事务解绑。
  2. 执行完成阶段。如果本地事务提交成功,执行完成阶段的确认分支;如果本地事务提交失败,执行完成阶段的取消分支。
  3. 错误事务入库事务仓库。如果完成阶段执行出错,将TCC事务放入事务仓库的错误事务集合,等待后续的重试。

下面针对每一个步骤进行展开。

步骤一:提交本地事务

从Spring中取得本地事务参与者,从TCC事务中取得本地事务实例,调用方法org.bytesoft.bytejta.TransactionCoordinator#end(org.bytesoft.transaction.TransactionContext, int)
取消事务在事务管理器中的关联,调用方法org.bytesoft.bytejta.TransactionCoordinator#commit
执行本地事务的提交。

这里的本地事务提交走的JTA的套路,也就是先执行资源的出列,进而调用到XAResour.end
方法。这里特别注意下,TCC之中由于不需要真正的XA,因此都是依靠LocalXAResource
这个类来模拟XA动作的。其end方法会往bytejta
表插入一行记录(内容就是xid,gxid,bxid)。这行记录是为了给本地事务的提交保存一个记录,以避免出现本地事务提交了,但是文件日志没有记录系统却立刻宕机导致的数据丢失问题。

本地事务的成功提交会触发通知方法CompensableTransactionImpl#onCommitSuccess
。由于当前是尝试阶段,继续委托方法CompensableTransactionImpl#onInvocationPhaseCommitSuccess
,由于当前TCC事务是协调者且传播层级等于0,继续委托方法CompensableTransactionImpl#onInvocationPhaseCoordinatorCommitSuccess
。该方法会执行以下逻辑:

  1. xidToArchiveMap
    中以当前本地事务的事务上下文的xid获取关联的TCC归档列表。清空列表,为列表中的每一个元素设置tried属性为true。
  2. 创建TCC事务归档对象,设置状态为javax.transaction.Status#STATUS_COMMITTING
    ,并且写入日志。

步骤二:执行完成阶段

如果本地事务提交成功,则执行完成阶段的确认分支,依靠方法CompensableManagerImpl#fireCompensableCommit
;如果本地事务提交失败或者异常,则执行完成阶段的取消分支,依靠方法CompensableManagerImpl#fireCompensableRollback

首先来看确认分支

fireCompensableCommit
只定义了TCC事务的确认分支提交的流程,分别有:

  1. 将TCC事务与当前线程绑定。该步骤主要是为了避免后续的操作中各个方法之间对TCC事务对象的传递。
  2. 执行方法CompensableTransactionImpl#commit
    进行TCC事务提交。
  3. 将TCC事务与当前线程解绑。

流程1和3无需多说,来看流程2。commit方法内部执行了一些状态检查后,继续委托方法CompensableTransactionImpl#fireCommit
执行提交。提交的流程如下:

  1. 设置TCC事务的事务上下文的完成阶段标识位为true,设置TCC事务的事务状态为`STATUS_COMMITTING。创建TCC事务归档,写入日志。
  2. 执行本地参与者的确认方法,也就是方法CompensableTransactionImpl#fireNativeParticipantConfirm
    1. 逆序遍历TCC事务中的archiveList,逆序的考量是因为这种顺序,最后执行的最优先确认,在数据一致性上有可能更好。当然,正序遍历其实也是无妨的。
    2. 对每一个TCC归档对象,获取其compensable
      属性,也就是TCC调用信息。如果该TCC调用信息的confirmableKey不为空,意味着该TCC操作的确认阶段的确认方法不为空,则使用confirmableKey作为Bean ID从容器中获取对应的Bean,执行与尝试阶段相同的接口,完成确认分支。该确认分支也是需要在事务环境下执行的。特别注意,在TCC操作的尝试阶段,每一个TCC调用在注册到TCC事务时,都对应生成了一个TCC归档对象,其中包含了一个完成阶段使用的XID,在执行完成阶段的确认方法开启事务时,这个xid就会作为事务开启的XID使用
    3. 在执行完成阶段的确认分支时,一样存在资源入列事务的通知触发,由于此时处于完成阶段,因此通知方法最终执行到CompensableTransactionImpl#onCompletionPhaseEnlistResource
      。该通知方法为当前正在执行完成阶段的TCC归档对象设置compensableResourceKey
      属性,值为作用于该完成阶段的确认方法的本地事务使用的数据源的Bean ID。设置成功后写入日志。
    4. 确认方法的事务提交成功时会触发通知方法CompensableTransactionImpl#onCompletionPhaseCommitSuccess
      。该方法进行一些校验后,会将当前确认方法对应的TCC归档对象中的confirmed属性设置为true,更新后写入日志。
  3. 执行远端参与者的confirm流程,也就是方法CompensableTransactionImpl#fireRemoteParticipantConfirm
    1. 遍历resourceList,为每一个元素执行方法org.bytesoft.transaction.archive.XAResourceArchive#commit
    2. commit方法实际是通过JDK代理委托给方法supports.springcloud.SpringCloudCoordinator#invokePostCoordinator
      。该方法的内容其实很简单,就是构建出一个Http的Post调用,将commit请求转化为一个http请求org/bytesoft/bytetcc/commit/f832e474c7305362b9925601f27d4656/true
      。后两个是参数xid和onePhaseCommit的值。由于远端实际也是单机事务,因此这里的onePhaseCommit必然为true。远端的方法也会检测该参数的值,如果不为true,则会抛出异常。
    3. 远端实现注册了一个端点,以SpringBoot为例,是supports.springcloud.controller.CompensableCoordinatorController
      (框架也提供了别的RPC端点,比如Dubbo)。接收上级节点的对应请求,比如当收到commit请求时,则会调用方法org.bytesoft.bytetcc.CompensableCoordinator#commit
      。该方法内部会完成提交并且成功后清除日志的功能。
  4. 将TCC事务的事务状态设置为已提交。创建TCC事务归档,写入日志。

接着看看取消分支

CompensableManagerImpl#fireCompensableRollback
只是定义了取消分支的流程,分别是:

  1. 将TCC事务和当前线程绑定,这个意义只是为了避免后续对TCC事务不停的传参。
  2. 执行TCC的回滚逻辑。
  3. 将TCC事务和当前线程解绑。

步骤一和三略过,来看步骤二。CompensableTransactionImpl#rollback
进行一些状态检查,委托方法CompensableTransactionImpl#fireRollback
执行回滚。逻辑有:

  1. 设置事务状态为javax.transaction.Status#STATUS_ROLLING_BACK
  2. 设置当前本地事务为只回滚,不过这个只对参与者生效,当前TCC事务是协调者的情况下,没有作用。
  3. 更新TCC事务的事务状态的compensating
    属性为true,更新后写入日志。
  4. 执行本地参与者的取消方法。
    1. 逆序遍历archiveList
      ,对每一个元素进行操作。
    2. 如果TCC归档对象的tried属性为false,则意味着对应的尝试阶段的事务没有提供成功,也就无需执行取消方法,则忽略该元素。
    3. 如果TCC归档对象的canceled属性为true,意味着其取消方法已经成功执行过了,忽略该元素。
    4. 如果TCC归档对象关联的TCC调用对象的cancellableKey
      为空,则直接设置其canceled
      属性为true。
    5. 如果TCC归档对象关联的TCC调用对象的cancellableKey
      属性不为空,意味着存在对应的取消方法。则通过Bean容器去执行这个方法。这个方法也是在本地事务的作用域下执行的。当这个方法开启事务,申请链接资源时,会发出通知方法开启事务,在申请链接资源时,会触发通知方法CompensableTransactionImpl#onCompletionPhaseEnlistResource
      。该方法会设置该补偿归档信息在完成阶段使用的datasource的标识符。设置后将更新变化写入到日志中。在事务提交后,会触发通知方法CompensableTransactionImpl#onCompletionPhaseCommitSuccess
      。判断当前是执行完成阶段的取消方法,设置当前的TCC归档对象的的cenceled
      属性为true。更新后将更新信息写入日志。
  5. 执行远程参与者的取消方法,如果过程中发生异常,则抛出异常。
  6. 如果执行本地参与者的取消方法时发生过错误,则抛出错误。否则将事务的状态更新为javax.transaction.Status#STATUS_ROLLEDBACK
    ,生成新的TCC事务归档,写入日志。

步骤三:错误事务入库事务仓库

如果TCC事务的完成阶段出错,则后续需要进行必要的重试。此时先将TCC事务对象放入事务仓库的错误事务集合中。后续其他恢复线程可以从事务仓库中进行读取并且执行重试。

故障恢复

如果在执行完成阶段时报错,则TCC事务没有正确结束,就会放入事务仓库的错误事务集合。框架定义了定时任务work.CompensableWork
。该类按照配置的频率执行方法TransactionRecoveryImpl#timingRecover
进行错误事务的重试恢复。该方法遍历错误事务列表,针对每一个事务调用方法TransactionRecoveryImpl#recoverTransactionIfNecessary
进行事务恢复。该方法会进行超时判断,还在恢复超时内的事务才会执行事务恢复。确认没有超时,委托给方法TransactionRecoveryImpl#recoverTransaction
进行恢复。根据当前TCC事务是否协调者本身,有不同的走向:

  • 是:调用接口CompensableTransactionImpl#recover
    进行事务本身的恢复工作,接着调用方法TransactionRecoveryImpl#recoverCoordinator
  • 否:调用接口CompensableTransactionImpl#recover
    进行事务本身的恢复工作,接着调用方法TransactionRecoveryImpl#recoverParticipant

无论是否协调者,都先执行CompensableTransactionImpl#recover
。该方法根据当前事务的状态判断是确认分支还是取消分支的恢复,并且生成标识符作为结果。将标识符传递给两个方法:CompensableTransactionImpl#recoverNativeResource
CompensableTransactionImpl#recoverRemoteResource
。下面分别来看这两个方法。

CompensableTransactionImpl#recoverNativeResource

在TCC事务执行完成阶段时,在事务提交前,会在bytejta表中插入一行记录,主要内容就是作用于该TCC完成阶段的本地事务的xid,而这个xid是在注册TCC调用到TCC事务的时候事先初始化的。由于业务操作和插入bytejta表是在一个本地事务,因此可以通过判断bytejta表是否存在指定的xid记录来确定指定的TCC调用的完成阶段是否正确提交。

recoverNativeResource
方法的核心逻辑就是遍历TCC事务的archiveList
列表,反向遍历其中的元素,获取每一个元素的compensableResourceKey
属性的值,也就是这个TCC调用在完成阶段使用的数据源的标识符,并且通过这个标识符找到对应的数据源,在这个数据源的bytejta表中查询是否存在TCC调用在完成阶段使用的xid作为主键的记录。如果存在,意味着TCC调用在完成阶段的事务已经成功提交,此时只需要设置TCC调用归档信息的confirmed或者canceled属性即可,根据一开始传入的标识符。

也就是说,基于本地事务的一致性保证,通过检查bytejta表的记录,就可以确认之前的确认/取消方法是否已经执行完毕,如果执行过,则忽略。

CompensableTransactionImpl#recoverRemoteResource

目前暂无实现。

在资源层面的恢复确认后,开始重试尝试。

当TCC事务是协调者时,也就是执行方法org.bytesoft.bytejta.TransactionRecoveryImpl#recoverCoordinator
。该方法根据给定事务的状态是执行确认分支的重试还是取消分支的重试。但是两个方法的内在逻辑十分相似,都是三个步骤:

  1. 判断系统当前是否处于日志恢复模式,如果是的话,调用CompensableTransactionImpl#recover
    进行资源层面的恢复判断。
  2. 将TCC事务的事务上下文的重试次数+1,;更新事务上下文中的创建时间为当前时间。
  3. 根据当前是确认分支的重试还是取消分支的重试执行对应的方法,分别是CompensableTransactionImpl#fireCommit
    CompensableTransactionImpl#fireRollback
    ,实际就是再次执行commit或者rollback。commit或rollback方法的内部在执行每一个TCC调用的完成阶段时都会首先判断是否其是否已经执行过了。因此反复调用这些方法是安全的。

清除日志

TCC事务的完成阶段如果顺利结束,都会将自身写入清除日志。也就是调用方法CompensableTransactionImpl#forget

该方法的核心就是组装条件,用于调用方法supports.resource.LocalResourceCleaner#forget
。层层委托,最终委托给了work.vfs.CleanupFile#forget(javax.transaction.xa.Xid, java.lang.String)

这个forget调用的内容也很简单,就是将xid和resource的标识符(bean ID)写入到文件。上文提到过,本地事务提交时,会在表bytejta中新增一条记录。如果TCC事务成功完成,则该记录就没有意义了,forget写入到文件中的内容,后续另外一个线程会读取出来,并且依靠文件中的记录将bytejta中的记录删除。

事务日志

事务日志的作用主要是用于在系统宕机后重新对事务进行重建,并且将未完成的事务部分继续完成。因此设计的原则就是在事务的每一次变化时都写入日志,包括有:创建,注册TCC调用,注册远程调用,TCC调用执行完成阶段,远程调用完成阶段,每一次事务状态的变化。

调用顺序:

  1. TCC事务开启,调用方法logging.SampleCompensableLogger#createTransaction
    ,此时事务状态为javax.transaction.Status#STATUS_ACTIVE
  2. 调用远程方法前,拦截器执行远端资源入列TCC事务,执行方法CompensableTransactionImpl#enlistResource
    时调用logging.SampleCompensableLogger#createParticipant
    输出日志。此时序列化的是org.bytesoft.transaction.archive.XAResourceArchive
    对象。
  3. 当需要获取连接执行数据库操作时,首先会进行XA资源入列本地事务,通过通知器,最终会调用到方法CompensableTransactionImpl#onInvocationPhaseEnlistResource
    。在这里会为TCC归档设置本地事务ID和TCC事务ID。同时调用方法logging.SampleCompensableLogger#createCompensable
    将补偿归档对象org.bytesoft.compensable.archive.CompensableArchive
    进行输出。
  4. 当TCC方法内嵌套调用了其他的TCC方法,通过拦截器supports.spring.CompensableMethodInterceptor#invoke(org.aspectj.lang.ProceedingJoinPoint)
    执行拦截时,会将TCC调用注册到TCC事务中。此时也会将新的补偿归档对象输出到日志,通过方法logging.SampleCompensableLogger#createCompensable
  5. 当本地事务提交时,通知方法CompensableTransactionImpl#onCommitSuccess
    会被调用,本地事务的提交只是TCC事务提交中的一个步骤。因此此时TCC事务的事务状态已经改变,变为javax.transaction.Status#STATUS_COMMITTING
    。调用方法logging.SampleCompensableLogger#updateTransaction
    更新事务状态。
  6. 准备进行TCC事务的提交环节。进入到方法CompensableTransactionImpl#fireCommit
    时,首先会进行事务归档信息的日志输出,调用方法org.bytesoft.compensable.logging.CompensableLogger#updateTransaction
    .此时事务状态为javax.transaction.Status#STATUS_COMMITTING
  7. TCC事务的提交阶段,进行TCC操作的confirm阶段时,由于也存在单机事务的打开,本地XA资源入列,因此此时会触发通知方法CompensableTransactionImpl#onCompletionPhaseEnlistResource
    。在TCC的补偿阶段,是根据TCC事务中注册的补偿归档信息的逆序进行补偿的,执行补偿操作时,会设置补偿归档信息org.bytesoft.compensable.archive.CompensableArchive
    compensableResourceKey
    属性,这个更新通过日志输出,调用方法logging.SampleCompensableLogger#updateCompensable
  8. 当补偿操作执行完毕进行提交时,通知方法CompensableTransactionImpl#onCompletionPhaseCommitSuccess
    被触发。如果提交顺利,补偿归档信息CompensableArchive
    confirmed
    属性更新为true。这个补偿归档信息对象和第七步的是一样的。通过日志进行输出,调用方法logging.SampleCompensableLogger#updateCompensable
    进行输出。
  9. TCC事务中的本地方法的补偿操作执行完毕后,就开始执行远端方法的补偿操作。每一个远端资源执行完毕补偿操作后(也就是XAResource.commit方法),会设置资源归档信息org.bytesoft.transaction.archive.XAResourceArchive
    的一些状态信息,设置后通过方法logging.SampleCompensableLogger#updateParticipant
    进行日志输出。
  10. TCC事务提交的最后,将事务状态更新为javax.transaction.Status#STATUS_COMMITTED
    ,将事务归档信息进行输出,通过方法logging.SampleCompensableLogger#updateTransaction
  11. 当TCC事务成功提交后,会调用方法org.bytesoft.transaction.Transaction#forgetQuietly
    将不再需要的事务删除,会调用到方法logging.SampleCompensableLogger#deleteTransaction
    进行日志输出。

远端调用拦截

发起方

在补偿方法中调用远程方法时,通过拦截器去执行一些事务相关的操作,对应的接口是org.bytesoft.transaction.supports.rpc.TransactionInterceptor
。会涉及到两个方法

  • org.bytesoft.transaction.supports.rpc.TransactionInterceptor#beforeSendRequest
  • org.bytesoft.transaction.supports.rpc.TransactionInterceptor#afterReceiveResponse

在TCC中,对应的实现类是supports.rpc.CompensableInterceptorImpl
。拦截思路都是近似的,但是实现上各有所不同,具体来说就是不同的拦截器实现。比如通过feign,比如通过resttemplate等。

外部的拦截器需要构建接口org.bytesoft.transaction.supports.rpc.TransactionRequest
的实现类,更本质的,是需要能够提供org.bytesoft.transaction.remote.RemoteCoordinator
对象和org.bytesoft.transaction.TransactionContext
。后者简单,直接从当前的TCC事务中获取即可(如果有的话)。而前者通过方法supports.springcloud.SpringCloudBeanRegistry#getConsumeCoordinator
获取。该方法内部是通过JDK的动态代理生成了RemoteCoordinator
接口的实现对象,依靠的是supports.springcloud.SpringCloudCoordinator

发送前拦截

发送前拦截有两个逻辑:

  1. 克隆TCC事务的事务上下文对象,设置到TransactionRequest中。
  2. 取得本次调用的远端参与者信息,构建远端资源描述对象XAResourceArchive
    ,将对象入列到TCC事务中。

第一个步骤很简单就略过,来看第二个步骤。通过构建org.bytesoft.bytejta.supports.resource.RemoteResourceDescriptor
对象,作为org.bytesoft.transaction.remote.RemoteCoordinator
的代理对象,将RemoteResourceDescriptor
作为参数,执行方法CompensableTransactionImpl#enlistResource
。这种设计思路,实际上是将远端调用看成是一种XA资源,仍然是延续着XA的思路。资源入列后,按照XA的思路,会执行资源的start方法。这里需要生成一个针对本次调用的xid,其中全局ID部分和事务上下文的一致,分支标识符重新生成。上面提过,RemoteCoordinator
对象是通过JDK动态代理生成的,因此方法的调用最终会委托到supports.springcloud.SpringCloudCoordinator#invoke
。这个方法对start的实现是一个空实现,因为远端资源并不是真正的XA资源。

在start方法调用完毕后,构建org.bytesoft.transaction.archive.XAResourceArchive
对象,将上面的xid和RemoteCoordinator
对象设置到XAResourceArchive
的属性中,设置后将对象放入resourceList
resourceMap
中。放入成功时,将新建的XAResourceArchive
写入日志。远端资源入列TCC事务只需要保存一个节点信息,因此通过resourceMap
对节点信息进行去重。

如果远端资源还未被包括到TCC事务的远端资源列表中,则将远端资源包装为一个org.bytesoft.transaction.archive.XAResourceArchive
对象,该对象持有一个XID属性,就是上文构建的新的XID。将该XAResourceArchive
对象放入TCC事务的远端资源列表中。通过日志输出器将XAResourceArchive
输出到日志中。

收到响应后拦截

为了避免因为系统宕机造成远程服务的调用信息丢失,因此在发起远程调用前就先将远程调用注册为一个远端的资源入列到TCC事务。而如果发起远程调用的时候,判断出请求实际没有发出的话,则执行出列动作并写入日志,节省后续的处理资源。而如果请求确实到达了远端服务,则不需要做任何处理。

接收方

当服务端收到请求后,请求会经过拦截器supports.springcloud.web.CompensableHandlerInterceptor#preHandle
,这个拦截器如果检测到Http请求头包含X-BYTETCC-TRANSACTION
则就会执行TCC事务的相关判断,具体来说有:

  1. 从Http的header中获取TCC事务的序列化数据和TCC事务的传播者标识符。
  2. 反序列化TCC序列化数据得到TCC事务对象,设置propagated
    属性为true,设置propagatedBy
    属性为TCC事务的传播者标识符。从TCC事务的传播者标识符构建org.bytesoft.transaction.remote.RemoteCoordinator
    对象。使用这两个对象构建org.bytesoft.bytejta.supports.rpc.TransactionRequestImpl
    ,并且作为参数调用方法supports.springcloud.CompensableInterceptorImpl#afterReceiveRequest
    .
  3. 在响应的HttpHeader中设置X-BYTETCC-TRANSACTION
    为当前TCC事务的事务上下文的序列化值,设置X-BYTETCC-PROPAGATION
    为当前应用的标识符。

收到请求后拦截

收到请求后拦截最终委托到方法supports.rpc.CompensableInterceptorImpl#afterReceiveRequest
执行。该方法取得该应用内的TCC事务参与者对象,也就是CompensableCoordinator
,并且使用入参方法的事务上下文作为参数,启动事务,也就是调用方法CompensableCoordinator#start(org.bytesoft.transaction.TransactionContext, int)
。这个方法做了几个事情:

  1. 经过一系列检查无误,使用入参的事务上下文对象初始化TCC事务对象,并且放入事务仓库,写入日志。
  2. 事务上下文的传播层级+1,将该TCC事务与当前线程绑定。

总结来说,就是通过拦截器,发现当前的请求中包含了TCC事务的相关信息,因此初始化了一个TCC事务并且和当前线程绑定。

发送响应前拦截

服务提供者在处理完成业务后,发送Http响应。会经过拦截器supports.springcloud.web.CompensableHandlerInterceptor#afterCompletion
。该拦截器会委托方法supports.rpc.CompensableInterceptorImpl#beforeSendResponse
进行拦截。该方法而进一步委托方法CompensableCoordinator#end(org.bytesoft.transaction.TransactionContext, int)
进行,该方法主要逻辑是:

  1. 将TCC事务和当前线程解绑。
  2. 释放TCC事务对象的内部锁,前面在使用start开启事务时,内部锁被锁定了。
  3. 将事务上下文的传播层级-1。

恢复相关

系统重启后会执行日志的恢复工作,恢复的入口是类work.CompensableWork
。入口类是委托类TransactionRecoveryImpl
完成真正的恢复工作,调用方法org.bytesoft.transaction.TransactionRecovery#startRecovery
进行日志读取和恢复。该方法首先调用方法org.bytesoft.transaction.TransactionRecovery#startRecovery
进行事务恢复。在TCC中,本地事务依靠的单机事务进行持久性保证,因此不需要事务管理器自己再管理日志,因此其底层的日志实现是一个空实现。那么在这里也就没有执行具体的逻辑了。接着就到了TCC操作的恢复阶段,调用方法TransactionRecoveryImpl#fireCompensableStartRecovery
进行。

从日志恢复对应的TransactionArchive对象

TransactionRecoveryImpl#fireCompensableStartRecovery
方法通过调用org.bytesoft.compensable.logging.CompensableLogger#recover
来执行具体的日志恢复。该方法主要职责是从文件中恢复出对应的TransactionArchive对象,具体流程如下:

  1. 遍历文件,将每一个记录解析出来,将仍然激活的记录的xid(该xid只有全局ID)保存到一个set中(仍然激活的记录也就是没有执行过删除操作的事务)
  2. 遍历文件,将每一个记录解析出来,如果该记录的xid存在阶段1中的集合中,执行该记录原本对应的操作。执行操作的逻辑如图所示:
  3. 经过1和2两个步骤,此时XidMap应该包含了日志内相关的XId和对应的TransactionsArchive.遍历该XidMap的value集合,将数据传入回调接口org.bytesoft.transaction.recovery.TransactionRecoveryCallback#recover
    进行执行。

从TransactionArchive中执行恢复动作

方法首先会遍历TCC事务对象中的archiveList
属性,进行初步的状态修复,具体逻辑如下图

介绍下几种if的发生情况:

  • archive的tried属性为true:在本地事务提交后,会触发通知方法CompensableTransactionImpl#onCommitSuccess
    ,对archive的tried设置属性true并且通过日志输出
  • triedMap已经包含recordKey:TCC事务对象中的archiveList是按照TCC调用的顺序存放并且写入日志的,而事务提交的时候刷新archive信息也是按照这个顺序,可能出现前面的archive更新到日志中后系统宕机导致后面的archive没来得及更新到日志中,因此此时会出现这种情况。
  • 剩下情况:事务提交后系统就宕机,完全没有来得及写入日志。此时需要依靠和业务写入在一起的本地事务提供可靠性保证。

CleanupWork

run方法会进行主从文件的切换,切换成功后,对从文件进行清理和压缩。

CleanupFile

这里需要介绍两个重要的属性:

  1. recordMap:该属性以resourceId为key,记录着当前处于enable状态的CleanupRecord
  2. recordList:该列表中的元素和文件上的内容一一对应。

清理

CleanupFile
recordMap
中取得当前记录的CleanupRecord
信息,对相同resourceId的CleanupRecord
执行批量清理。清理流程的核心就是根据xid的值,从数据库bytejta进行数据删除。

删除成功的记录,设置对应的CleanupRecord
的删除标识符为真,也就是令recordFlag
的值与2进行并的位操作。

一次清理,会将CleanupFile
recordMap
的所有CleanupRecord
都完成清理工作。

压缩

对recordList进行遍历,执行以下操作:

  1. 如果记录是enabled & forgeted
    ,则执行删除操作,并且将record
    加入到removedList
    中。删除操作主要有:
    1. 设置record的enable属性为false,设置flag值为0
    2. 找到record在文件中的位置,将标识符的byte设置为0
    3. CleanupFile
      的recordMap中删除该record
  2. 大幅度

文件格式

CleanUp文件格式

头部

序号内容长度(字节)
1resource.cleanup定长,字符串长度
2主版本号,值为01
3副版本号,值为21
4是否master文件,0:否;1:是;2:准备中,该值意味着该准备即将成为主文件1
5文件内容体开始偏移量,int类型,值为常量,为前4个内容的长度4
6文件内容体的结束偏移量,int类型4

内容体

序号内容长度
1状态标识符,第一个bit是enable标识位,第二个bit是删除标识位。1
2XID中的全局ID16
3XID中的分支标识符16
4资源名称23

补偿日志格式

头部

序号内容长度
1标识符,固定字符串“logging.sample”标识符文本长度
2文件主版本号,默认值01
3文件副版本号,默认值21
4文件创建时间,long表达的毫秒数8
5内容体起始坐标,int数字4
6准备切换为主文件标识位,1:是;0:否1
7是否主文件标识位,1:是;0:否1
8文件修改时间,long表达的毫秒数8
9文件内容替结束坐标,int数字4

内容体

序号内容长度
1xid中全局ID的字节表达16
2操作符,1:创建;2:修改;3:删除1
3二进制内容体长度,int表达4
4内容体由序号3的内容定义

设计思想

ByteTCC是一个事务管理器组件,实现了JTA接口。通过介入事务开启,使用资源,提交事务等环节,进而实现对TCC流程的控制。而在每一个对事务总体有影响的变化步骤执行前,都会留下日志,供重启进行恢复。


ByteTCC将整个TCC事务分为两个阶段:尝试阶段(try)和完成阶段(complete)。其中完成阶段存在两个走向:confirm和cancel。具体走向的选择是根据try阶段是否能否正常完成来决定的。但是完成阶段执行的内容都是大体相似,都是执行对应的操作,并且在操作执行完毕后,删除执行完毕后的TCC事务。


一个TCC事务任意时间内和指定线程绑定的本地事务最多只有一个,但是整个TCC事务的尝试阶段中,可能会开启提交多个本地事务。一个本地事务可能作用于多于一个的TCC调用。


TCC事务的传播层级会随着TCC调用而进行增加。最初的TCC调用产生时,传播层级为0。


TCC事务的协调者本身才能修改TCC事务的状态,参与者只能更新涉及到自身的TCC归档的状态


ByteTCC不支持多个上级节点。

TCC事务的处理层级是事务,反应到应用上,就是实例层级而非方法层级。假设出现以下的调用顺序:

client->A->B1->C->B2->D

B1和B2分别是B应用上的两个不同方法

在B这个应用节点上,其TCC事务内部都包含个两个TCC归档信息,分别是B1和B2。如果全局事务提交时,A会向B发出commit指令,此时B节点会将内部的TCC归档信息遍历,然后执行提交动作,提交动作中会触发下一个节点的commit。而C节点处理本地提交后又会触发远端提交,此时再次提交到B上,形成死循环。因此多个上级节点会导致错误。


在调用方法org.bytesoft.bytetcc.CompensableTransactionImpl#participantCommit
前需要先调用org.bytesoft.bytetcc.CompensableTransactionImpl#lock
。由于远程调用的并发可能性,因此由上一个节点发起的commit请求需要通过加锁来保证正确性。

细节相关

XID相关

在一开始生成事务上下文的时候,XID当中只有formatID
globalID
有值。而当执行具体的操作的时候,比如资源入列到事务,注册补偿操作等等,此时需要创建新的XID,会使用继承事务上下文中的globalID
,并且创建全新的分支标识符。


文章转载自风火说,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论