暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

海山数据库(He3DB)技术分享:同步复制

dawn1221 2024-08-13
85

同步复制

概念

He3DB 是一个先进的开源关系数据库管理系统,支持多种复制模式,其中同步复制是一种重要的高可用性解决方案。同步复制的核心目的是确保主节点(primary)和一个或多个备用节点(standby)之间的数据一致性。
主节点(Primary):
负责处理客户端的读写请求。
数据更改操作会记录在WAL(Write-Ahead Log)日志中。
备用节点(Standby):
从主节点接收并应用WAL日志,以保持数据同步。
可以是只读的(hot standby),用于查询和负载均衡。

核心结构体

  1. XLogRecPtr:WAL日志的逻辑位置指针,表示当前WAL日志的位置,用于标识日志记录的位置。
typedef uint64 XLogRecPtr;
  1. XLogRecord:定义在 src/include/access/xlogrecord.h 文件中,表示单个WAL记录的数据结构。它包含了事务ID(Xid)、WAL类型(如插入、删除等)和变更数据。
typedef struct XLogRecord { uint32 xl_tot_len; /* total len of entire record */ TransactionId xl_xid; /* xact id */ XLogRecPtr xl_prev; /* ptr to previous record in log */ uint8 xl_info; /* flag bits, see below */ RmgrId xl_rmid; /* resource manager for this record */ pg_crc32c xl_crc; /* CRC for this record */ } XLogRecord;
  1. Latch:
    latch 是一个同步机制,用于保护共享资源并保证在多线程或多进程环境中的一致性。Latch 是一种简单的锁机制,它确保在一个线程或进程修改共享数据结构时,其它线程或进程不会访问或修改相同的数据结构,从而避免数据竞争和损坏。
    特点:
    锁定共享资源:latch 用于保护共享资源,防止多个线程或进程同时访问或修改同一资源。
    非抢占性:latch 是非抢占性的,这意味着如果一个线程持有一个 latch,那么它将持有到释放为止,其它线程或进程必须等待 latch 被释放。
    层次性:在 He3DB 中,latch 通常是分层的,这样可以更细粒度地控制资源的访问。
    内存中的同步:latch 是使用内存中的数据结构实现的,它们不依赖于内核级别的锁机制。
    重入性:latch 允许同一个线程多次获取同一个 latch,这意味着在一个函数中可能会嵌套多次获取同一个 latch。
    获取 latch(LWLockAcquire):线程或进程尝试获取 latch 保护的资源。
    释放 latch(LWLockRelease):线程或进程完成对资源的操作后,释放 latch。
    源码:
typedef struct Latch { sig_atomic_t is_set; sig_atomic_t maybe_sleeping; bool is_shared; int owner_pid; #ifdef WIN32 HANDLE event; #endif } Latch;
  1. LSN(Log Sequence Number,日志序列号)是一个非常重要的概念,用于唯一标识写入事务日志(WAL,Write-Ahead Logging)中的每一个日志记录。LSN 在 He3DB 数据库的事务一致性和崩溃恢复机制中发挥着关键作用。
    特点:
    唯一标识:每个 LSN 都是数据库中 WAL 日志的一个点,它以一个指定的格式表示日志记录在 WAL 中的位置(字节偏移量)。LSN 由两个整数部分组成:提交时间戳(通常以 64 位表示);在该时间戳下的字节偏移量(也通常以 64 位表示)。
    事务一致性:He3DB 使用 LSN 来跟踪哪些数据已经写入到磁盘中,从而确保在系统崩溃时能够恢复所有已提交的事务。这种机制称为写前日志(Write-Ahead Logging),要求所有修改必须先记录到 WAL 中,然后才能应用到实际的数据页上。
    崩溃恢复:在发生崩溃时,He3DB 可以使用 LSN 来定位 WAL 的特定位置,以恢复到一致的状态。这一过程可以有效确保数据不丢失且数据库处于一致性状态。
    流复制:在 He3DB 的流复制(streaming replication)中,LSN 也用于跟踪主数据库和从数据库之间的状态。它帮助从库确定需要从主库接收哪些 WAL 记录以保持数据同步。
    监控与诊断:在数据库监控中,LSN 用于分析和获取有关数据更改频率的信息。用户可以通过查看 LSN 来了解 WAL 的生成情况,以及进行性能调优。
    源码:
1. XLogRecPtr waitLSN; /* waiting for this LSN or higher */

原理分析

在这里插入图片描述

原理分析

步骤1:主库插入数据,刷 XLOG;
步骤2:调用 SyncRepWaitForLSN 等待,并将本进程插入 SyncRepQueue 队列中;
步骤3:备库 walreceiver 进程将 XLOG 刷入磁盘,并且通知主库的 walsender 进程;
步骤4:主库 walsender 进程收到备库的消息,使用 SyncRepWakeQueue 唤醒所有等待队列中的进程,并将其移出队列;
步骤5:主库执行 SQL 的进程继续执行,通知其他进程本事物已提交。

在这里插入图片描述

同步复制进程关系

核心进程同步模型:
SyncRepQueue:维护在共享内存中的队列。位于 WalSndCtlData 结构体中,可用于处理同步复制中的同步关系(备库 XLOG 落盘停止阻塞)。用于维护需要等待同步提交的进程队列;
SyncRepQueueInsert: 作用是把该进程插入 SyncRepQueue 队列中,然后开始等待;
SyncRepCancelWait:停止等待,并将该进程从队列中移除;
SyncRepWakeQueue:唤醒队列中所有等待的进程,并将所有进程移除队列;

源码分析:

CommitTransaction @ src/backend/access/transam/xact.c
RecordTransactionCommit @ src/backend/access/transam/xact.c

1. /* 2. * If we didn't create XLOG entries, we're done here; otherwise we 3. * should trigger flushing those entries the same as a commit record 4. * would. This will primarily happen for HOT pruning and the like; we 5. * want these to be flushed to disk in due time. 6. */ 7. if (!wrote_xlog) // 没有产生redo的事务,直接返回 8. goto cleanup; 9. 10. if (wrote_xlog && markXidCommitted) // 如果产生了redo, 等待同步流复制 11. SyncRepWaitForLSN(XactLastRecEnd); SyncRepWaitForLSN @ src/backend/replication/syncrep.c 1.void 2.SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) 3.{ 4. if (!SyncRepRequested() || !SyncStandbysDefined()) // 如果不是同步事务或者没有定义同步流复制节点,直接返回 5. return; 6. if (commit) 7. mode = SyncRepWaitMode; 8. else 9. mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH); 10. 11. Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks))); 12. Assert(WalSndCtl != NULL); 13. 14. LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); 15. Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING); 16. if (!WalSndCtl->sync_standbys_defined || 17. lsn <= WalSndCtl->lsn[mode]) 18. { 19. LWLockRelease(SyncRepLock); 20. return; 21. } 22. MyProc->waitLSN = lsn;/*将当前进程需要等待的日志序列号(LSN, Log Sequence Number)设置为 lsn。这个 LSN 通常表示数据库日志中某个特定的点,用于确保数据的一致性。*/ 23. MyProc->syncRepState = SYNC_REP_WAITING;/*表示进程正在等待其他节点确认日志已经被同步到其节点上。*/ 24. SyncRepQueueInsert(mode);将当前进程插入到同步复制队列中,这样它就可以在日志同步时被适当处理。*/ 25. Assert(SyncRepQueueIsOrderedByLSN(mode)); /*这是一个断言,检查同步复制队列是否按 LSN 顺序排列。这是为了确保队列中的条目是按照正确的顺序处理的,维护数据的一致性。*/ 26. LWLockRelease(SyncRepLock); 27.for (;;) 28. { 29. int rc; 30. ResetLatch(MyLatch); /*重新设置 MyLatch,这是一个用于进程间通信的机制。每次在检查状态之前,都需要重置它。*/ 31. if (MyProc->syncRepState == SYNC_REP_WAIT_COMPLETE) 32. break; 33. if (ProcDiePending) /*如果 ProcDiePending 为真,表示进程正在被终止。*/ 34. { 35. ereport(WARNING, 36. (errcode(ERRCODE_ADMIN_SHUTDOWN), 37. errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"), 38. errdetail("The transaction has already committed locally, but might not have been replicated to the standby."))); 39. whereToSendOutput = DestNone; 40. SyncRepCancelWait(); 41. break; 42. } 43. 44. 45. if (QueryCancelPending)/*果 QueryCancelPending 为真,表示查询取消中断到达。这里的处理方式是取消等待,并输出警告,说明由于用户请求取消同步复制等待。事务已经在本地提交,但可能尚未复制到备用节点。*/ 46. { 47. QueryCancelPending = false; 48. ereport(WARNING, 49. (errmsg("canceling wait for synchronous replication due to user request"), 50. errdetail("The transaction has already committed locally, but might not have been replicated to the standby."))); 51. SyncRepCancelWait(); 52. break; 53. } 54. rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1, 55. WAIT_EVENT_SYNC_REP); /*等待 MyLatch,任何需要唤醒进程的条件都会设置 MyLatch。由于等待没有超时(-1 表示无限期等待),所以不会设置超时。*/ 56. if (rc & WL_POSTMASTER_DEATH) /*如果 rc 包含 WL_POSTMASTER_DEATH,表示主进程已经死亡,进程将退出。*/ 57. { 58. ProcDiePending = true; 59. whereToSendOutput = DestNone; 60. SyncRepCancelWait(); 61. break; 62. } 63. } 64. 65. 66. pg_read_barrier(); /*确保所有内存操作在此之前完成,读取队列的更改。*/ 67. Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks))); /*确保进程的同步复制链接已经从队列中分离。*/ 68. MyProc->syncRepState = SYNC_REP_NOT_WAITING; /*重置同步复制状态。*/ 69. MyProc->waitLSN = 0; /*重置等待的 LSN。*/ 70. 71. if (new_status) 72. { 73. /* Reset ps display */ 74. set_ps_display(new_status); 75. pfree(new_status); 76. } 77.}

注意用户进入等待状态后,只有主动cancel , 或者kill(terminate) , 或者主进程die才能退出无限的等待状态。
SyncRepQueueInsert@ src/backend/replication/syncrep.c

1.SyncRepQueueInsert(int mode) 2.{ 3. PGPROC *proc; 4. Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);/*这是一个断言,确保传入的 mode 参数在有效范围内。NUM_SYNC_REP_WAIT_MODE 是同步复制模式的数量。*/ 5. proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]), 6. &(WalSndCtl->SyncRepQueue[mode]), 7. offsetof(PGPROC, syncRepLinks));/*获取当前同步复制队列中,MyProc 前一个元素的指针。SHMQueuePrev 是一个函数,用于在共享内存队列中查找前一个元素,offsetof(PGPROC, syncRepLinks) 是获取 syncRepLinks 成员在结构体中的偏移量。*/ 8. while (proc) /*遍历队列中的每个进程,直到队列结束或找到一个应该插入的合适位置。*/ 9. { 10. if (proc->waitLSN < MyProc->waitLSN) 11. break; 12. proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]), 13. &(proc->syncRepLinks), 14. offsetof(PGPROC, syncRepLinks)); 15. } 16. 17. if (proc) 18. SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks)); 19. else 20. SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks)); 21.}

src/backend/replication/walsender.c进程

1.StartReplication 2.WalSndLoop 3.ProcessRepliesIfAny 4.ProcessStandbyMessage 5.ProcessStandbyReplyMessage 6. if (!am_cascading_walsender) // 非级联流复制节点,那么它将调用SyncRepReleaseWaiters修改backend process等待队列中它们对应的 latch。 7. SyncRepReleaseWaiters();

SyncRepReleaseWaiters @ src/backend/replication/syncrep.c

1.void 2.SyncRepReleaseWaiters(void) 3.{ 4.... 5. // 释放满足条件的等待队列 6. /* 7. * Set the lsn first so that when we wake backends they will release up to 8. * this location. 9. */ 10. if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write) 11. { 12. walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write; 13. numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE); 14. } 15. if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush) 16. { 17. walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush; 18. numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); 19. }

SyncRepWakeQueue @ src/backend/replication/syncrep.c

1.static int 2.SyncRepWakeQueue(bool all, int mode) 3.{ 4.... 5. while (proc) // 修改对应的backend process 的latch 6. { 7. /* 8. * Assume the queue is ordered by LSN 9. */ 10. if (!all && walsndctl->lsn[mode] < proc->waitLSN) 11. return numprocs; 12. 13. /* 14. * Move to next proc, so we can delete thisproc from the queue. 15. * thisproc is valid, proc may be NULL after this. 16. */ 17. thisproc = proc; 18. proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]), 19. &(proc->syncRepLinks), 20. offsetof(PGPROC, syncRepLinks)); 21. 22. /* 23. * Set state to complete; see SyncRepWaitForLSN() for discussion of 24. * the various states. 25. */ 26. thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE; // 满足条件时,改成SYNC_REP_WAIT_COMPLETE 27.....

参考链接:https://developer.aliyun.com/article/55676
参考链接:https://zhuanlan.zhihu.com/p/587150379

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论