逻辑复制的两个worker
逻辑复制事实上有两个worker在工作
- apply worker:真正干活的,负责把主机中的change应用到备机
- sync worker:负责同步主机和备机中表的数据
本文主要关注sync worker的行为
Sync worker的状态
/* ----------------
* substate constants
* ----------------
*/
#define SUBREL_STATE_INIT 'i' /* initializing (sublsn NULL) */
#define SUBREL_STATE_DATASYNC 'd' /* data is being synchronized (sublsn NULL) */
#define SUBREL_STATE_SYNCDONE 's' /* synchronization finished in front of apply (sublsn set) */
#define SUBREL_STATE_READY 'r' /* ready (sublsn set) */
/* These are never stored in the catalog, we only use them for IPC. */
#define SUBREL_STATE_UNKNOWN '\0' /* unknown state */
#define SUBREL_STATE_SYNCWAIT 'w' /* waiting for sync */
#define SUBREL_STATE_CATCHUP 'c' /* catching up with apply */
下图是Sync worker的状态转移图:

创建逻辑订阅的命令如下:
CREATE SUBSCRIPTION abc WITH (copy_data=true)
如果(copy_data=true),那么则需要先同步表数据。如果(copy_data=false),那么只需要同步data change。
两个Worker的主要函数调用关系
ApplyWorkerMain(Datum main_arg) /* Logical Replication Apply worker entry point */
LogicalRepSyncTableStart(&origin_startpos);
copy_table(rel);
fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), RelationGetRelationName(rel), &lrel);
make_copy_attnamelist(relmapentry);
BeginCopyFrom(pstate, rel, NULL, false, copy_read_data, NULL,attnamelist, NIL);
wait_for_worker_state_change(SUBREL_STATE_CATCHUP); /* Wait for main apply worker to tell us to catchup. */
LogicalRepApplyLoop(origin_startpos); /* Run the main loop. */
apply_dispatch(&s);
apply_handle_commit(s);
process_syncing_tables(commit_data.end_lsn);
process_syncing_tables_for_sync(current_lsn);
process_syncing_tables_for_apply(current_lsn);
wait_for_relation_state_change(rstate->relid, SUBREL_STATE_SYNCDONE);
CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,invalidate_syncing_table_states, (Datum) 0); /* Setup callback for syscache so that we know when something changes in the subscription relation state. */
逻辑表同步源码分析
char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
{
/* 获取订阅表信息 */
relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid,
&relstate_lsn, true);
switch (MyLogicalRepWorker->relstate) /* 判断状态 */
{
case SUBREL_STATE_DATASYNC: // 同步正在进行中
StartTransactionCommand(); // 和下方的CCI形成一个事务包裹
rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock); // 打开表
walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, true,
CRS_USE_SNAPSHOT, origin_startpos); // 在这个事务内,创建一个临时的slot,此处有一个快照
PushActiveSnapshot(GetTransactionSnapshot());
copy_table(rel); // 把这个表复制过去,基于COPY命令,数据拷贝到之前的快照的位置
PopActiveSnapshot();
table_close(rel, NoLock); // 关闭表
CommandCounterIncrement(); // 一个事务包裹
case SUBREL_STATE_SYNCDONE: // 同步结束
finish_sync_worker(); // 结束同步worker
break;
}
}
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




