背景
Citus 是一个 PG 的扩展,在不修改PG内核的情况下,可以将 PG 转换为分布式数据库。Citus 支持多种负载,包括多租户应用程序、实时分析、高频交易和时间序列或物联网数据的实时分析。在分布式数据库中,分布式事务是一个重要的功能特性,这里对citus的分布式事务流程进行介绍。
分布式事务处理流程
参考代码流程
CommitTransaction
-> CallXactCallbacks(XACT_EVENT_PRE_COMMIT)
-> CoordinatedTransactionCallback(XACT_EVENT_PRE_COMMIT)
-> CoordinatedRemoteTransactionsPrepare
-> StartRemoteTransactionPrepare
-> LogTransactionRecord -- 写入pg_dist_transaction
-> SendRemoteCommand(PREPARE TRANSACTION)
-> RecordTransactionCommit()
-> CallXactCallbacks(XACT_EVENT_COMMIT)
-> CoordinatedTransactionCallback(XACT_EVENT_COMMIT)
-> CoordinatedRemoteTransactionsCommit
-> StartRemoteTransactionCommit
-> SendRemoteCommand(COMMIT PREPARED)
从代码中可以看到:
citus的分布式事务,当事务提交时,首先向pg_dist_transaction中预写入记录(但不会马上提交),然后向所有worker发送prepare transaction,然后cn进行RecordTransactionCommit() 把pg_dist_transaction内容持久化,然后XACT_EVENT_COMMIT时,会进行commit prepared。更多细节可以参考上面列出的代码执行流程进行查看。
故障恢复流程
- citus的两阶段事务recovery只运行在cn上。
- 每次recovery 2pc,都是一个大事务,即 StartTransactionCommand -> RecoverTwoPhaseCommits -> CommitTransactionCommand ,这个大事务可以保证
- 打开的锁如果碰到错误可以自动回滚
- 本次对所有worker涉及到的pg_dist_transaction的修改要么都成功,要么都失败
- dn节点顺序进行恢复的
- 对每个节点的prepared txn处理的流程:
a) 首先去每个worker上,看pg_prepared_xacts事务,了解prepared transaction的情况,得到哪些事务已经prepared
参考查询语句:SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\%d\%%', coordinatorId
b) 然后在cn上看哪些事务是 inprogress的。 如果对worker中已经prepared的,但cn上还是inprogress的,说明暂时不需要处理这些事务的,这些事务还在正常执行中。
会根据ProcGlobal->allProcs中的信息得到inprogess的事务的transactionNumber
c) 然后在cn上看 pg_dist_transaction 表,如果有事务在 pg_dist_transaction 中,说明是需要commit transaction的,如果worker上有prepared 事务,但没有在pg_dist_transaction中看到,那就是需要rollback prepared的。 如果pg_dist_transaction中的记录没有找到worker上对应记录的,可能因为worker上已经提交了,只是还没来得及清理,所以要把pg_dist_transaction中的记录删除(
d) 再去每个worker上看一遍 prepared transaction的情况,与a步骤做的事情一致。
查看相关系统表信息
citus在cn上维护一张表 pg_dist_transaction ,groupid 就是workerid, gid就是两阶段事务的名称。
以一个2pc事务为例,有一个协调节点cn,两个woker节点。 如果有隐式2pc事务,则在cn上的pg_dist_transaction会有两行,每行对应各自worker上的2pc。
-- 在cn上
postgres=# select * from pg_dist_transaction;
groupid | gid
---------+---------------------
2 | citus_0_18125_13_10
3 | citus_0_18125_13_11
(2 rows)
-- 在worker1 上
postgres=# select * from pg_prepared_xacts;
-[ RECORD 1 ]------------------------------
transaction | 506
gid | citus_0_18125_13_10
prepared | 2025-04-20 17:38:26.998543+08
owner | citus
database | postgres
-- 在worker2 上
postgres=# select * from pg_prepared_xacts;
-[ RECORD 1 ]------------------------------
transaction | 505
gid | citus_0_18125_13_11
prepared | 2025-04-20 17:38:27.000953+08
owner | citus
database | postgres
gid的格式
* citus_<source group>_<pid>_<distributed transaction number>_<connection number>
*
* (at most 5+1+10+1+10+1+20+1+10 = 59 characters, while limit is 64)
*
* The source group is used to distinguish 2PCs started by different
* coordinators. A coordinator will only attempt to recover its own 2PCs.
*
* The pid is used to distinguish different processes on the coordinator, mainly
* to provide some entropy across restarts.
*
* The distributed transaction number is used to distinguish different
* transactions originating from the same node (since restart).
*
* The connection number is used to distinguish connections made to a node
* within the same transaction.
总结
Citus 通过使用 PG 的扩展机制,实现了分布式事务的处理。其事务处理流程遵循标准的两阶段提交(2PC)协议,确保了分布式事务的原子性。Citus 的故障恢复机制进一步增强了事务处理的可靠性,使得它能够在分布式环境中高效地处理事务。




