暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Postgres逻辑复制tablesync.c源码分析

原创 Yongtao 2023-04-20
276

逻辑复制的两个worker

逻辑复制事实上有两个worker在工作

  • apply worker:真正干活的,负责把主机中的change应用到备机
  • sync worker:负责同步主机和备机中表的数据

本文主要关注sync worker的行为

Sync worker的状态

/* ---------------- * substate constants * ---------------- */ #define SUBREL_STATE_INIT 'i' /* initializing (sublsn NULL) */ #define SUBREL_STATE_DATASYNC 'd' /* data is being synchronized (sublsn NULL) */ #define SUBREL_STATE_SYNCDONE 's' /* synchronization finished in front of apply (sublsn set) */ #define SUBREL_STATE_READY 'r' /* ready (sublsn set) */ /* These are never stored in the catalog, we only use them for IPC. */ #define SUBREL_STATE_UNKNOWN '\0' /* unknown state */ #define SUBREL_STATE_SYNCWAIT 'w' /* waiting for sync */ #define SUBREL_STATE_CATCHUP 'c' /* catching up with apply */

下图是Sync worker的状态转移图:
machine state.png
创建逻辑订阅的命令如下:

CREATE SUBSCRIPTION abc WITH (copy_data=true)

如果(copy_data=true),那么则需要先同步表数据。如果(copy_data=false),那么只需要同步data change。

两个Worker的主要函数调用关系

ApplyWorkerMain(Datum main_arg) /* Logical Replication Apply worker entry point */ LogicalRepSyncTableStart(&origin_startpos); copy_table(rel); fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), RelationGetRelationName(rel), &lrel); make_copy_attnamelist(relmapentry); BeginCopyFrom(pstate, rel, NULL, false, copy_read_data, NULL,attnamelist, NIL); wait_for_worker_state_change(SUBREL_STATE_CATCHUP); /* Wait for main apply worker to tell us to catchup. */ LogicalRepApplyLoop(origin_startpos); /* Run the main loop. */ apply_dispatch(&s); apply_handle_commit(s); process_syncing_tables(commit_data.end_lsn); process_syncing_tables_for_sync(current_lsn); process_syncing_tables_for_apply(current_lsn); wait_for_relation_state_change(rstate->relid, SUBREL_STATE_SYNCDONE); CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,invalidate_syncing_table_states, (Datum) 0); /* Setup callback for syscache so that we know when something changes in the subscription relation state. */

逻辑表同步源码分析

char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) { /* 获取订阅表信息 */ relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, &relstate_lsn, true); switch (MyLogicalRepWorker->relstate) /* 判断状态 */ { case SUBREL_STATE_DATASYNC: // 同步正在进行中 StartTransactionCommand(); // 和下方的CCI形成一个事务包裹 rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock); // 打开表 walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, true, CRS_USE_SNAPSHOT, origin_startpos); // 在这个事务内,创建一个临时的slot,此处有一个快照 PushActiveSnapshot(GetTransactionSnapshot()); copy_table(rel); // 把这个表复制过去,基于COPY命令,数据拷贝到之前的快照的位置 PopActiveSnapshot(); table_close(rel, NoLock); // 关闭表 CommandCounterIncrement(); // 一个事务包裹 case SUBREL_STATE_SYNCDONE: // 同步结束 finish_sync_worker(); // 结束同步worker break; } }
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论