暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

海山数据库(He3DB)源码解读:SubBegin-Rollback

Eric 2024-11-18
75

# 海山数据库(He3DB)源码详解:子事务块内核函数执行过程

本文介绍了子事务执行过程中,从SAVEPOINT定义子事务开始到子事务结束释放RELEASE或者回滚ROLLBACK To的内核函数执行过程。

处理TRANS_STMT_SAVEPOINT

当终端输入SAVEPOINT XX语句时,这时会发起一个子事务保存点定义的操作。首先会调用RequireTransactionBlock函数检查当前事务的状态,之后再调用DefineSavepoint函数启动一个子事务节点,加入到事务链栈中。

RequireTransactionBlock()函数执行过程:

RequireTransactionBlock()函数会调用CheckTransactionBlock()函数完成对事务状态的检查,具体的CheckTransactionBlock()函数执行过程如下:

  1. 调用IsTransactionBlock()检查全局结构体变量CurrentTransactionState中的blockState成员值,如果blockState等于TBLOCK_DEFAULT或者TBLOCK_STARTED,表示不存在事务块,返回false,其他返回true:
if (IsTransactionBlock()) return;
  1. 调用IsSubTransaction()检查全局结构体变量CurrentTransactionState中的nestingLevel成员值,如果≥2表示有子事务嵌套,返回true,否则返回false:
if (IsSubTransaction()) return;
  1. 判断当前事务是否为顶层事务,如果不是顶层事务直接返回退出。
if (!isTopLevel) return;
  1. 如果以上检查都没有通过,则根据输入参数throwError的值输出ERROR错误还是WARING错误。
ereport(throwError ? ERROR : WARNING, (errcode(ERRCODE_NO_ACTIVE_SQL_TRANSACTION), /* translator: %s represents an SQL statement name */ errmsg("%s can only be used in transaction blocks", stmtType)));

DefineSavepoint()函数执行过程:

该函数和其他事务块操作函数十分相似,内部主要是一个swith结构,说执行流程如下:

在这里插入图片描述

  1. 通过全局状态变量获取当前事务状态,并检查是否为并行事务模式,如果时并行事务则直接报错。
TransactionState s = CurrentTransactionState; /* * Workers synchronize transaction state at the beginning of each * parallel operation, so we can't account for new subtransactions * after that point. * (Note that this check will certainly error out if s->blockState * is TBLOCK_PARALLEL_INPROGRESS, so we can treat that as an invalid * case below.) */ if (IsInParallelMode()) ereport(ERROR, (errcode(ERRCODE_INVALID_TRANSACTION_STATE), errmsg("cannot define savepoints during a parallel operation")));
  1. 通过swtich-case结构,同时完成事务块检查和子事务创建任务。
  • 事务块为TBLOCK_INPROGRESS或TBLOCK_SUBINPROGRESS状态:正常状态,通过PushTransaction()函数创建子事务切换事务状态,并为子事务name申请内存空间并命名。
case TBLOCK_INPROGRESS: case TBLOCK_SUBINPROGRESS: /* Normal subtransaction start */ PushTransaction(); s = CurrentTransactionState; /* changed by push */ /* * Savepoint names, like the TransactionState block itself, live * in TopTransactionContext. */ if (name) s->name = MemoryContextStrdup(TopTransactionContext, name); break;
  • 事务块为TBLOCK_IMPLICIT_INPROGRESS状态:错误状态,事务处在隐式状态,即事务完成会自动提交或者出错自动放弃,定义子事务会与这个目的冲突。(子事务可以回滚放弃部分操作,但是隐式事务要求回滚放弃全部操作)
case TBLOCK_IMPLICIT_INPROGRESS: ereport(ERROR, (errcode(ERRCODE_NO_ACTIVE_SQL_TRANSACTION), /* translator: %s represents an SQL statement name */ errmsg("%s can only be used in transaction blocks", "SAVEPOINT"))); break;
  • 事务块处在上述三种状态之外的其他状态:这些状态均是无效的状态,提交FATAL日志。
/* These cases are invalid. */ case TBLOCK_DEFAULT: case TBLOCK_STARTED: case TBLOCK_BEGIN: case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBBEGIN: case TBLOCK_END: case TBLOCK_SUBRELEASE: case TBLOCK_SUBCOMMIT: case TBLOCK_ABORT: case TBLOCK_SUBABORT: case TBLOCK_ABORT_END: case TBLOCK_SUBABORT_END: case TBLOCK_ABORT_PENDING: case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: case TBLOCK_SUBABORT_RESTART: case TBLOCK_PREPARE: elog(FATAL, "DefineSavepoint: unexpected state %s", BlockStateAsString(s->blockState)); break;

处理TRANS_STMT_RELEASE

当终端输入RELEASE XX语句时,这时会发起一个子事务保存点释放的操作。首先会调用RequireTransactionBlock函数检查当前事务的状态,之后再调用ReleaseSavepoint函数修改目标子事务节点的事务块状态。

RequireTransactionBlock函数的处理过程与SAVEPOINT过程相同,不做更多的阐述。

ReleaseSavepoint()函数的执行过程:

该函数和其他事务块操作函数类似,内部有一个swith结构,在此之后会修改从当前子事务到目标事务直接的事务块状态,具体执行流程如下:

在这里插入图片描述

  1. 首先,从全局变量中获得当前事务状态,并申请两个相同的变量;之后检查当前事务是否为并行事务模式。
TransactionState s = CurrentTransactionState; TransactionState target, xact; if (IsInParallelMode()) ereport(ERROR, (errcode(ERRCODE_INVALID_TRANSACTION_STATE), errmsg("cannot release savepoints during a parallel operation")));
  1. 通过swtich-case结构,完成事务块状态检查任务,在这个结构中,除了事务块状态为TBLOCK_SUBINPROGRESS可以正常退出,其他都会报错(TBLOCK_INPROGRESS和TBLOCK_IMPLICIT_INPROGRESS两个状态报ERROR,其他报FATAL)。

  2. 通过for循环,从当前节点回溯寻找目标节点。PointerIsValid()用于判断输入的指针是否为NULL。如果节点中成员name存在且等于目标的name,那么找到目标节点,退出循环。

for (target = s; PointerIsValid(target); target = target->parent) { if (PointerIsValid(target->name) && strcmp(target->name, name) == 0) break; }
  1. 检查目标节点的有效性和保存等级。
if (!PointerIsValid(target)) ereport(ERROR, (errcode(ERRCODE_S_E_INVALID_SPECIFICATION), errmsg("savepoint \"%s\" does not exist", name))); /* disallow crossing savepoint level boundaries */ if (target->savepointLevel != s->savepointLevel) ereport(ERROR, (errcode(ERRCODE_S_E_INVALID_SPECIFICATION), errmsg("savepoint \"%s\" does not exist within current savepoint level", name)));
  1. 临时变量xact获取当前节点的事务状态值,并通过死循环修改从当前到目标值的事务块状态,直到遇到目标值退出。
xact = CurrentTransactionState; for (;;) { Assert(xact->blockState == TBLOCK_SUBINPROGRESS); xact->blockState = TBLOCK_SUBRELEASE; if (xact == target) break; xact = xact->parent; Assert(PointerIsValid(xact)); }

处理TRANS_STMT_ROLLBACK_TO

当终端输入ROLLBACK TO XX语句时,这时会发起一个子事务保存点回滚的操作。首先会调用RequireTransactionBlock函数检查当前事务的状态,之后再调用RollbackToSavepoint函数修改目标子事务节点的事务块状态。

RequireTransactionBlock函数的处理过程与SAVEPOINT过程相同,不做更多的阐述。

RollbackToSavepoint()函数的执行过程:

在这里插入图片描述

  1. 设定事务状态,获取当前事务状态,创建目标父节点和临时事务状态变量,并检查当前是否是并行事务模式。
TransactionState s = CurrentTransactionState; TransactionState target, xact; if (IsInParallelMode()) ereport(ERROR, (errcode(ERRCODE_INVALID_TRANSACTION_STATE), errmsg("cannot rollback to savepoints during a parallel operation")));
  1. 检查事务块状态
    通过switch-case结构检查当前事务块的状态,和ReleaseSavepoint()函数相似,除了处于TBLOCK_SUBINPROGRESS或TBLOCK_SUBABORT状态,可以正确退出,其余都会报错。
/* We can't rollback to a savepoint if there is no savepoint defined.*/ case TBLOCK_INPROGRESS: case TBLOCK_ABORT: ereport(ERROR, (errcode(ERRCODE_S_E_INVALID_SPECIFICATION), errmsg("savepoint \"%s\" does not exist", name))); break; case TBLOCK_IMPLICIT_INPROGRESS: ereport(ERROR, (errcode(ERRCODE_NO_ACTIVE_SQL_TRANSACTION), errmsg("%s can only be used in transaction blocks", "ROLLBACK TO SAVEPOINT"))); break; /* There is at least one savepoint, so proceed. */ case TBLOCK_SUBINPROGRESS: case TBLOCK_SUBABORT: break; /* These cases are invalid. */ case TBLOCK_DEFAULT: /* ... */ elog(FATAL, "RollbackToSavepoint: unexpected state %s", BlockStateAsString(s->blockState)); break;
  1. 回溯寻找目标节点。通过for循环回溯链栈,寻找目标节点target。
for (target = s; PointerIsValid(target); target = target->parent) { if (PointerIsValid(target->name) && strcmp(target->name, name) == 0) break; }
  • PointerIsValid本质为宏定义,作用为判断参数是否为NULL;
  • 判断target->name是否为空,且是否等于目标节点的name。如果为True,则跳出for循环;
  1. 检查目标节点target的状态,同时检查保存点等级。
if (!PointerIsValid(target)) ereport(ERROR, (errcode(ERRCODE_S_E_INVALID_SPECIFICATION), errmsg("savepoint \"%s\" does not exist", name))); /* disallow crossing savepoint level boundaries */ if (target->savepointLevel != s->savepointLevel) ereport(ERROR, (errcode(ERRCODE_S_E_INVALID_SPECIFICATION), errmsg("savepoint \"%s\" does not exist within current savepoint level", name)));
  1. 修改目标点之前的状态,将当前事务到目标节点之间的状态标记为ABORT PENDING或ABORT END状态。
xact = CurrentTransactionState; for (;;) { if (xact == target) break; if (xact->blockState == TBLOCK_SUBINPROGRESS) xact->blockState = TBLOCK_SUBABORT_PENDING; else if (xact->blockState == TBLOCK_SUBABORT) xact->blockState = TBLOCK_SUBABORT_END; else elog(FATAL, "RollbackToSavepoint: unexpected state %s", BlockStateAsString(xact->blockState)); xact = xact->parent; Assert(PointerIsValid(xact)); }
  1. 修改xact作为目标节点的事务块状态。
/* And mark the target as "restart pending" */ if (xact->blockState == TBLOCK_SUBINPROGRESS) xact->blockState = TBLOCK_SUBRESTART; else if (xact->blockState == TBLOCK_SUBABORT) xact->blockState = TBLOCK_SUBABORT_RESTART; else elog(FATAL, "RollbackToSavepoint: unexpected state %s", BlockStateAsString(xact->blockState));
  • 如果临时变量xact的blockState为TBLOCK_SUBINPROGRESS,将事务块状态修改为TBLOCK_SUBRESTART;
  • 如果临时变量xact的blockState为TBLOCK_SUBABORT,将事务块状态修改为TBLOCK_SUBABORT_RESTART;
  • 如果临时变量xact的事务块状态为其他状态,则提交FATAL日志;

完成RollbackToSavepoint函数。

		 BlockStateAsString(xact->blockState));
   - 如果临时变量xact的blockState为TBLOCK_SUBINPROGRESS,将事务块状态修改为TBLOCK_SUBRESTART;
   - 如果临时变量xact的blockState为TBLOCK_SUBABORT,将事务块状态修改为TBLOCK_SUBABORT_RESTART;
   - 如果临时变量xact的事务块状态为其他状态,则提交FATAL日志;

完成RollbackToSavepoint函数。






「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论