暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

海山数据库(He3DB)源码详解:StartTransaction函数

wukong 2024-08-21
20

Table of Contents

海山数据库(He3DB)源码详解:StartTransaction函数

本文介绍了事务提交过程中,具体执行提交任务的StartTransaction函数详细执行流程。

1. 执行条件

存在三种调用此函数的场景:

  1. 事务开始状态下,在事务处于TRANS_DEFAULT状态且事务块处于TBLOCK_DEFAULT状态下,由StartTransactionCommand函数调用StartTransaction函数完成事务开始的操作。
  2. 事务提交后,在CommitTransactionCommand函数中,事务块状态分别为TBLOCK_END、TBLOCK_ABORT_END、TBLOCK_ABORT_PENDING三种状态下,且事务状态参数chain为true时,会调用StartTransaction函数。
  3. 并行事务条件下,在ParallelWorkerMain函数中由StartParallelWorkerTransaction调用StartTransaction函数。

2. 执行过程

2.1 当前状态设定:

申请事务状态变量TransactionState s和虚拟事务ID变量VirtualTransactionId vxid,

并将顶部事务状态复制给s,将事务状态变量复制给全局变量CurrentTransactionState。

TransactionState s; VirtualTransactionId vxid; /* * Let's just make sure the state stack is empty */ s = &TopTransactionStateData; CurrentTransactionState = s;

因为事务刚刚开始,新的事务状态应该与Top事务相同,且当前只有刚启动的事务,所以将事务状态变量s复制给CurrentTransactionState。(顺序为:top->s->current)。

  1. TopTransactionState主要设定了刚开始事务时,初始状态下top的事务和事务块状态,以及对应Xid的日志是否提交。
  2. CurrentTransactionState主要保存了当前事务状态相关的参数,主要是事务状态、事务块状态、XID、保存点、父节点等级等一系列参数。

2.2 检查当前状态:

申请事务状态变量TransactionState s和虚拟事务ID变量VirtualTransactionId vxid,

并将顶部事务状态复制给s,将事务状态变量复制给全局变量CurrentTransactionState。

Assert(!FullTransactionIdIsValid(XactTopFullTransactionId)); /* check the current transaction state */ Assert(s->state == TRANS_DEFAULT);

这里主要判断,事务开始时Xid和事务状态是否为合理的状态。因为事务刚开始,Xid应该为0且事务状态应该为TRANS_DEFAULT。保证事务开始时,事务栈中的状态正确。

2.3 开始事务初始化:

将事务状态修改为TRANS_START,随后开始事务的初始操作,

主要对事务状态变量TransactionState s中的对应参数进行初始化复制。

s->state = TRANS_START; s->fullTransactionId = InvalidFullTransactionId; /* until assigned */ /* Determine if statements are logged in this transaction */ xact_is_sampled = log_xact_sample_rate != 0 && (log_xact_sample_rate == 1 || pg_prng_double(&pg_global_prng_state) <= log_xact_sample_rate); /* * initialize current transaction state fields * note: prevXactReadOnly is not used at the outermost level */ s->nestingLevel = 1; s->gucNestLevel = 1; s->childXids = NULL; s->nChildXids = 0; s->maxChildXids = 0; /* * Once the current user ID and the security context flags are fetched, * both will be properly reset even if transaction startup fails. */ GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext); /* SecurityRestrictionContext should never be set outside a transaction */ Assert(s->prevSecContext == 0);
  1. 指定初始的fullTransactionId;
  2. 判断是否要提交日志。如果提交,提交的频率是多少。
  3. 初始结构体变量TransactionState s;包括事务嵌套等级、GUC上下文、子事务相关参数。
  4. GetUserIdAndSecContext()函数获取prevUser和prevSecContext两个变量并做复制,保证了事务启动失败时,能恢复到一个之前的安全状态。
  5. 这个Assert就是判断一下事务开始时,前一个安全状态是不是空。(没启动之前就应该是空)

检查当前系统是否处于一个Recover的阶段。

因为,系统可能会因为之前的崩溃导致系统后台还在一个恢复过程,虽然你可以进入并使用数据库,但是这个时候你不能进行写入,只能读取数据。

if (RecoveryInProgress()) { s->startedInRecovery = true; XactReadOnly = true; } else { s->startedInRecovery = false; XactReadOnly = DefaultXactReadOnly; }

RecoveryInProgress函数首先会检查LocalRecoveryInProgress全局变量。默认为true,表示不知道当前系统状态,需要检查XlogCtl中的SharedRecoveryState变量。如果SharedRecoveryState的值为RECOVERY_STATE_DONE,就将LocalRecoveryInProgress设置为false,RecoveryInProgress返回false,系统可读写,不在恢复状态中。

事务是否串行化、事务隔离等级,是否强制同步提交,事务当前标记。

XactDeferrable = DefaultXactDeferrable; XactIsoLevel = DefaultXactIsoLevel; forceSyncCommit = false; MyXactFlags = 0;

正确情况下,不需要事务串行化(串行化会强制事务按照顺序执行,降低数据库读写效率,但是可以解决幻读问题)。事务隔离等级一般为Read Commit。forceSyncCommit会强制事务按照同步的方式提交。MyXactFlags为事务标志,当前没有操作所以为0。

进行子事务相关的初始化。

s->subTransactionId = TopSubTransactionId; currentSubTransactionId = TopSubTransactionId; currentCommandId = FirstCommandId; currentCommandIdUsed = false;

初始化Xlog相关的东西。

nUnreportedXids = 0; // 未记录的Xid s->didLogXid = false; // WAL是否记录在log中

初始化配置内存memory和ResourceOwner。

AtStart_Memory(); AtStart_ResourceOwner();
  1. AtStart_Memory函数:首先,从TopMemoryContext申请AbortTransaction函数的内存上下文TransactionAbortContext,保证出错可以正确执行Abort过程;然后,从TopMemoryContext申请TopMemoryContext的内容,再把CurTransactionContext设置为top;最后,切换上下文到CurTransactionContext。完成上下文内存申请。
  2. AtStart_ResourceOwner函数:首先,为当前事务状态结构中的curTransactionOwner创建一个Owenr,然后,将这个Owner分别设置为TopTransactionResourceOwner、CurTransactionResourceOwner和CurrentResourceOwner。完成ResourceOwner配置

配置后台进程ID和虚拟事务。

获取一个新的LocalTransactionId,最后将vxid给到一个可用的MyProc中。

vxid.backendId = MyBackendId; vxid.localTransactionId = GetNextLocalTransactionId(); /* * Lock the virtual transaction id before we announce it in the proc array */ VirtualXactLockTableInsert(vxid);
  1. MyBackendId会在InitPostgres中进行初始化。
  2. GetNextLocalTransactionId会通过一个do…while循环获取一个LocalTransactionId,只要LocalTransactionId不是无效的都可以用。
  3. VirtualXactLockTableInsert这里主要是将vxid.localTransactionId给到一个可用的MyProc中,MyProc是一个PGPROC结构体,在全局存在一个PGPROC链表,用于进程间的通信,后台对某一个MyProc的跟踪和管理,对一个事务来说,事务的start、commit和abort都通过MyProc变量来通知后台。

判断MyProc和vxid的backendId是否对应,并将LocalTransactionId复制给lxid。

Assert(MyProc->backendId == vxid.backendId); MyProc->lxid = vxid.localTransactionId;

事务的开始和结束时间的时间戳设置。

这里要判断事务是否是一个并行化操作的事务,并进一步根据SPI模式进行事务开始的时间戳设置。

if (!IsParallelWorker()) { if (!SPI_inside_nonatomic_context()) xactStartTimestamp = stmtStartTimestamp; else xactStartTimestamp = GetCurrentTimestamp(); } else Assert(xactStartTimestamp != 0); pgstat_report_xact_timestamp(xactStartTimestamp); /* Mark xactStopTimestamp as unset. */ xactStopTimestamp = 0;
  1. 如果是并行化worker,这时已经不需要获取只需要Assert判断,因为已经通过调用SetParallelStartTimestamps()函数获取到了开始时间的时间戳。
  2. 如果不是并行化worker,且如果是SPI的方式,就需要获取当前的时间戳,而不是语句开始的时间戳;如果不是SPI的方式,就直接获取当前语句的开始时间戳。
  3. pgstat_report_xact_timestamp想日志记录开始时间戳,标记结束时间戳为0;

初始化GUC、Cache和延迟触发器。

/* * initialize other subsystems for new transaction */ AtStart_GUC(); AtStart_Cache(); AfterTriggerBeginXact();

3. 完成事务启动

事务开始过程结束,修改事务为TRANS_INPROGRESS,并记录一条日志。

/* * done with start processing, set current transaction state to "in * progress" */ s->state = TRANS_INPROGRESS; ShowTransactionState("StartTransaction");

作者介绍

李超,移动云数据库工程师,负责云原生数据库He3DB的研发。

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论