暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

海山数据库(He3DB)技术分析:异步复制

wukong 2024-08-20
17

异步复制

一、概述

   异步复制(Asynchronous Replication)是He3DB提供的一种数据库复制方式。它允许数据在主服务器和从服务器之间进行复制,而不要求从服务器与主服务器在事务提交时保持同步。主服务器将事务日志(WAL)传送给从服务器,但不等待从服务器确认其已经接收到或应用这些日志。这种复制方式可以减少主服务器的等待时间,提高写性能,但可能导致数据在从服务器上有一定延迟。

二、核心数据结构

   (1)WAL (Write-Ahead Logging):WAL是一种日志记录机制,用于保证数据库的持久性和一致性。在异步复制中,主服务器会将WAL记录发送到从服务器。

1.1.@src/backend/access/transam/xlog.c 2.typedef struct XLogCtlData 3.{ 4. XLogRecPtr Insert; // 当前写入位置 5. char *pages; // WAL页面缓存 6. // ... 其他成员 7.} XLogCtlData;

   (2)Replication Slots:复制槽(Replication Slot)是PostgreSQL中的一种机制,用于确保主服务器不会丢弃从服务器尚未接收到的WAL日志。它能够跟踪每个从服务器的复制进度。

@src/backend/replication/slot.c typedef struct ReplicationSlot { XLogRecPtr candidate_xmin_lsn; /*逻辑解码中,表示候选的 xmin LSN(日志序列号),当客户端确认 flush 操作达到该值后,可以前进到该 LSN。*/ XLogRecPtr candidate_restart_valid; /*表示候选的有效重启 LSN,用于确定可以安全重启的位置。*/ XLogRecPtr candidate_restart_lsn; /*逻辑解码中,表示候选的重启 LSN,当客户端确认 flush 操作达到该值后,可以更新重启 LSN。*/ // ... 其他成员 } ReplicationSlot;

   (3)WalSender和WalReceiver。WalSender:主服务器进程,负责发送WAL日志给从服务器。

src/backend/replication/walsender.c typedef struct WalSnd { XLogRecPtr sentPtr; // 已发送的位置 // ... 其他成员 } WalSnd;

WalReceiver:从服务器进程,负责接收来自主服务器的WAL日志并应用。

@src/backend/replication/walreceiver.h typedef struct { XLogRecPtr receiveStart; // 接收开始位置 XLogRecPtr latestWalEnd; // 接收结束位置 // ... 其他成员 } WalRcvData;

三、函数栈

3.1 主服务器端

主要步骤:
1.WalSndLoop(WalSndSendDataCallback send_data):
  描述: walsender进程的主循环函数。
  主要步骤:
   (1)初始化walsender槽位:
调用 InitWalSenderSlot() 以确保walsender有一个可用的槽位。
   (2)进入主循环:在循环内,主要执行以下任务:
调用 send_data(可以是XLogSendPhysical或XLogSendLogical)发送WAL数据。

处理来自备节点的回复消息(调用ProcessRepliesIfAny)。

检查是否需要发送心跳(调用WalSndKeepaliveIfNecessary)。

计算适当的睡眠时间并等待(调用WalSndWait)。

检查和处理超时(调用WalSndCheckTimeOut)。
核心代码:

static void WalSndLoop(WalSndSendDataCallback send_data) { last_reply_timestamp = GetCurrentTimestamp();/*初始化 last_reply_timestamp 变量,记录当前的时间戳。这是为了启用超时处理。*/ waiting_for_ping_response = false;/*设置 waiting_for_ping_response 标志为 false,表明当前没有在等待 ping 响应。*/ for (;;) { ResetLatch(MyLatch); /*清除之前设置的唤醒标记,以准备接收新的事件。*/ CHECK_FOR_INTERRUPTS(); /*检查是否有中断请求(例如终止请求或其他信号),并处理它们。*/ if (ConfigReloadPending) /*检查是否需要重新加载配置文件。*/ { ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); SyncRepInitConfig(); /*初始化同步复制配置。*/ } ProcessRepliesIfAny(); /*处理任何从客户端收到的回复。*/ if (streamingDoneReceiving && streamingDoneSending && !pq_is_send_pending()) break; /*检查是否已经完成了接收和发送操作,并且输出缓冲区为空。如果满足这些条件,说明可以退出流传输。*/ if (!pq_is_send_pending()) /*检查输出缓冲区是否没有待发送的数据。如果没有待发送的数据,则调用 send_data 函数尝试发送更多数据。*/ send_data(); /*调用传入的 send_data 回调函数以发送数据。*/ else WalSndCaughtUp = false; /*如果输出缓冲区有待发送的数据,设置 WalSndCaughtUp 为 false,表示还没有赶上主节点的状态。*/ if (pq_flush_if_writable() != 0) /*尝试将待发送的数据刷新到客户端。如果失败,则调用 WalSndShutdown 函数来关闭流传输。*/ WalSndShutdown(); if (WalSndCaughtUp && !pq_is_send_pending()) { /*检查是否已经赶上主节点的状态且输出缓冲区为空。如果是,并且当前状态是 WALSNDSTATE_CATCHUP,则记录调试信息,并将状态更改为 WALSNDSTATE_STREAMING。*/ if (MyWalSnd->state == WALSNDSTATE_CATCHUP) { ereport(DEBUG1, (errmsg_internal("\"%s\" has now caught up with upstream server", application_name))); WalSndSetState(WALSNDSTATE_STREAMING); } /*检查是否收到 SIGUSR2 信号。如果收到,则调用 WalSndDone 函数来完成流传输并关闭连接。*/ if (got_SIGUSR2) WalSndDone(send_data); } WalSndCheckTimeOut(); /*检查是否发生了复制超时。如果超时,则处理相关逻辑。*/ WalSndKeepaliveIfNecessary(); /*如果需要,发送 keepalive 消息以保持连接活跃。*/ if ((WalSndCaughtUp && send_data != XLogSendLogical && !streamingDoneSending) || pq_is_send_pending()) /*如果已经赶上主节点状态且不需要逻辑复制,或者输出缓冲区有待发送的数据,则阻塞等待事件。*/ { long sleeptime; int wakeEvents; if (!streamingDoneReceiving) /*如果尚未完成接收,则设置 wakeEvents 为 WL_SOCKET_READABLE,表示需要读取事件。*/ else wakeEvents = WL_SOCKET_READABLE; else wakeEvents = 0; sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); /*计算下次唤醒的时间。*/ if (pq_is_send_pending()) wakeEvents |= WL_SOCKET_WRITEABLE; WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN); /*阻塞等待事件或超时。*/ } } }

2.XLogSendPhysical():
 描述: 发送物理WAL数据。
 主要步骤:
 (1)读取未发送的WAL记录:
使用XLogReadRecord读取未发送的WAL记录。
 (2)打包并发送WAL记录:
将读取的WAL记录打包。
通过网络发送打包的WAL记录到备节点。
核心代码:
 (3)更新发送进度:
调用LagTrackerWrite更新发送进度。

static void XLogSendPhysical(void) { XLogRecPtr SendRqstPtr; /*用于存储请求发送的 WAL 记录位置。*/ XLogRecPtr startptr; XLogRecPtr endptr; /*用于计算实际发送的数据范围。*/ Size nbytes; /*存储要发送的字节数。*/ XLogSegNo segno; /*用于存储当前日志段号。*/ WALReadError errinfo; /*用于存储读取 WAL 时的错误信息。*/ if (got_STOPPING) WalSndSetState(WALSNDSTATE_STOPPING); if (streamingDoneSending) { WalSndCaughtUp = true; return; } /*如果接收到停止信号 (got_STOPPING),则设置 Walsender 的状态为停止。 如果已经完成发送 (streamingDoneSending),则标记 WalSndCaughtUp 为 true,并返回。*/ if (sendTimeLineIsHistoric) /*如果当前时间线已经过时 (sendTimeLineIsHistoric),则 SendRqstPtr 设置为 sendTimeLineValidUpto。*/ { SendRqstPtr = sendTimeLineValidUpto; } else if (am_cascading_walsender) /*如果是级联 Walsender (am_cascading_walsender),则首先获取备用服务器的刷新位置和时间线。*/ { TimeLineID SendRqstTLI; bool becameHistoric = false; SendRqstPtr = GetStandbyFlushRecPtr(&SendRqstTLI); if (!RecoveryInProgress()) /*如果恢复不在进行中,则更新时间线,并设置 becameHistoric 为 true。*/ { SendRqstTLI = GetWALInsertionTimeLine(); am_cascading_walsender = false; becameHistoric = true; } else { if (sendTimeLine != SendRqstTLI) becameHistoric = true; } if (becameHistoric) /*如果时间线变得历史性,将 sendTimeLineValidUpto 更新为新的时间线切换点,并设置 sendTimeLineIsHistoric 为 true。*/ { List *history; history = readTimeLineHistory(SendRqstTLI); sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI); Assert(sendTimeLine < sendTimeLineNextTLI); list_free_deep(history); sendTimeLineIsHistoric = true; SendRqstPtr = sendTimeLineValidUpto; } } else /*如果不是级联 Walsender,则直接获取刷新位置 (GetFlushRecPtr),并设置 SendRqstPtr。*/ { SendRqstPtr = GetFlushRecPtr(NULL); } LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp()); /*记录发送延迟信息 (LagTrackerWrite)。*/ if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr) { /*如果时间线已经过时,并且 sendTimeLineValidUpto 小于等于 sentPtr,则: 关闭 WAL 段 (wal_segment_close)。 发送完成消息 (pq_putmessage_noblock)。 设置 streamingDoneSending 和 WalSndCaughtUp 为 true,并打印日志。*/ if (xlogreader->seg.ws_file >= 0) wal_segment_close(xlogreader); pq_putmessage_noblock('c', NULL, 0); streamingDoneSending = true; WalSndCaughtUp = true; elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)", LSN_FORMAT_ARGS(sendTimeLineValidUpto), LSN_FORMAT_ARGS(sentPtr)); return; } Assert(sentPtr <= SendRqstPtr); if (SendRqstPtr <= sentPtr) { WalSndCaughtUp = true; return; } /*确保 sentPtr 小于等于 SendRqstPtr,如果 SendRqstPtr 小于等于 sentPtr,则 WalSndCaughtUp 设为 true 并返回。*/ startptr = sentPtr; endptr = startptr; endptr += MAX_SEND_SIZE; /*计算要发送的日志块的起始 (startptr) 和结束 (endptr) 位置,确保不超过 SendRqstPtr。*/ if (SendRqstPtr <= endptr) /*如果发送的日志块超出请求位置,调整 endptr。*/ { endptr = SendRqstPtr; if (sendTimeLineIsHistoric) WalSndCaughtUp = false; else WalSndCaughtUp = true; } else { /* round down to page boundary. */ endptr -= (endptr % XLOG_BLCKSZ); WalSndCaughtUp = false; } nbytes = endptr - startptr; /*计算实际发送的字节数 (nbytes)。*/ Assert(nbytes <= MAX_SEND_SIZE); resetStringInfo(&output_message); /*重置 output_message 并初始化发送数据。*/ pq_sendbyte(&output_message, 'w'); pq_sendint64(&output_message, startptr); /* dataStart */ pq_sendint64(&output_message, SendRqstPtr); /* walEnd */ pq_sendint64(&output_message, 0); /* sendtime, filled in last */ /*使用 pq_sendbyte、pq_sendint64 将发送起始位置、结束位置等信息写入 output_message。 enlargeStringInfo(&output_message, nbytes);*/ retry: if (!WALRead(xlogreader, &output_message.data[output_message.len], startptr, nbytes, xlogreader->seg.ws_tli, /* Pass the current TLI because * only WalSndSegmentOpen controls * whether new TLI is needed. */ &errinfo)) WALReadRaiseError(&errinfo); /*调用 WALRead 读取实际的 WAL 数据到 output_message,并处理读取错误。*/ XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize); /*将 startptr 转换为日志段号并检查日志是否已被移除。*/ CheckXLogRemoved(segno, xlogreader->seg.ws_tli); /*如果是级联 Walsender */(am_cascading_walsender),检查是否需要重新加载 WAL 段。 if (am_cascading_walsender) { WalSnd *walsnd = MyWalSnd; bool reload; SpinLockAcquire(&walsnd->mutex); reload = walsnd->needreload; walsnd->needreload = false; SpinLockRelease(&walsnd->mutex); if (reload && xlogreader->seg.ws_file >= 0) { wal_segment_close(xlogreader); goto retry; } } output_message.len += nbytes; output_message.data[output_message.len] = '\0'; resetStringInfo(&tmpbuf); pq_sendint64(&tmpbuf, GetCurrentTimestamp()); memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)], tmpbuf.data, sizeof(int64)); pq_putmessage_noblock('d', output_message.data, output_message.len); /*更新 output_message 并发送数据。*/ sentPtr = endptr; /*更新 sentPtr。*/ { WalSnd *walsnd = MyWalSnd; SpinLockAcquire(&walsnd->mutex); walsnd->sentPtr = sentPtr; SpinLockRelease(&walsnd->mutex); } /*更新 Walsnd 结构中的 sentPtr。*/ if (update_process_title) { char activitymsg[50]; snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X", LSN_FORMAT_ARGS(sentPtr)); set_ps_display(activitymsg); /*如果需要更新进程标题,设置为当前正在流式传输的日志位置。*/

3.ProcessRepliesIfAny():
 描述: 处理来自备节点的回复消息。
 主要步骤:
 (1)检查是否有消息:
通过检查网络连接,查看是否有备节点的回复消息。
 (2)处理消息:
根据消息类型(如心跳、反馈等),分别调用ProcessStandbyMessage、ProcessStandbyReplyMessage和ProcessStandbyHSFeedbackMessage进行处理。
核心代码:

static void ProcessRepliesIfAny(void) { unsigned char firstchar; /*定义一个无符号字符变量 firstchar,用来存储接收到的消息的第一个字节(即消息类型)。*/ int maxmsglen; /*定义一个整数变量 maxmsglen,用来设置消息的最大长度。*/ int r; /*定义一个整数变量 r,用来存储读取数据时的结果。*/ bool received = false; /*定义一个布尔变量 received,初始化为 false,用于指示是否接收到有效的消息。*/ last_processing = GetCurrentTimestamp(); /*记录当前时间戳到 last_processing 变量中,表示开始处理消息的时间。*/ while (!streamingDoneReceiving)/*当 streamingDoneReceiving 为 false 时,继续循环处理消息。*/ { pq_startmsgread(); /*准备开始读取消息。*/ r = pq_getbyte_if_available(&firstchar); /*尝试读取一个字节到 firstchar,并将结果存储在 r 中。*/ if (r < 0) /*小于 0,表示发生了错误或到达了文件结束符(EOF)。*/ { /*记录错误信息,显示 “unexpected EOF on standby connection”,并退出进程。*/ ereport(COMMERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("unexpected EOF on standby connection"))); proc_exit(0); } if (r == 0) /*如果 r 等于 0,表示没有数据可读(非阻塞状态)。*/ { pq_endmsgread(); /*结束读取消息的准备。*/ break; } switch (firstchar) /*根据 firstchar 的值来确定消息类型。*/ { case 'd': /*如果 firstchar 是 'd',设置 maxmsglen 为 PQ_LARGE_MESSAGE_LIMIT,表示较大的消息。*/ maxmsglen = PQ_LARGE_MESSAGE_LIMIT; break; case 'c': case 'X': /*如果 firstchar 是 'c' 或 'X',设置 maxmsglen 为 PQ_SMALL_MESSAGE_LIMIT,表示较小的消息。*/ maxmsglen = PQ_SMALL_MESSAGE_LIMIT; break; default: /*如果 firstchar 是其他值,记录致命错误信息,并将 maxmsglen 设置为 0。*/ ereport(FATAL, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("invalid standby message type \"%c\"", firstchar))); maxmsglen = 0; /* keep compiler quiet */ break; } resetStringInfo(&reply_message); /*重置 reply_message 对象,准备接收新消息。*/ if (pq_getmessage(&reply_message, maxmsglen)) /*: 尝试读取消息到 reply_message 中,最大长度为 maxmsglen。*/ { ereport(COMMERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("unexpected EOF on standby connection"))); proc_exit(0); } switch (firstchar) /*根据 firstchar 的值来处理消息*/ { case 'd': /*调用 ProcessStandbyMessage() 处理 'd' 类型的消息,并将 received 设置为 true。*/ ProcessStandbyMessage(); received = true; break; /* 如果消息类型是 'c',检查是否已经完成发送:*/ if (!streamingDoneSending): /*如果尚未完成发送,发送 'c' 消息以通知完成*/ case 'c': if (!streamingDoneSending) { pq_putmessage_noblock('c', NULL, 0); streamingDoneSending = true; } streamingDoneReceiving = true; received = true; break; case 'X': /* 如果消息类型是 'X',退出进程。*/ proc_exit(0); default: Assert(false); } } if (received) /*如果在处理过程中接收到有效的消息:*/ { last_reply_timestamp = last_processing; /*更新 last_reply_timestamp 为 last_processing,记录最后一次回复的时间戳。*/ waiting_for_ping_response = false; /*设置 waiting_for_ping_response 为 false,表示不再等待 ping 响应。*/ } }

4.WalSndKeepalive(bool requestReply, XLogRecPtr writePtr):
 描述: 发送心跳信号。
 主要步骤:
 (1)构建心跳消息:
创建一个心跳消息包,包含当前WAL位置和其他必要信息。
 (2)发送心跳消息:
通过网络将心跳消息发送给备节点,确保连接的存活。
核心代码:

static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr) { elog(DEBUG2, "sending replication keepalive"); /* construct the message... */ resetStringInfo(&output_message); pq_sendbyte(&output_message, 'k'); pq_sendint64(&output_message, XLogRecPtrIsInvalid(writePtr) ? sentPtr : writePtr);/*向 output_message 中添加一个 64 位的整数值。如果 writePtr 无效(通过 XLogRecPtrIsInvalid 函数检查),则使用 sentPtr;否则使用 writePtr。这个值通常表示日志写入的位置,用于复制协议的消息。*/ pq_sendint64(&output_message, GetCurrentTimestamp()); /*向 output_message 中添加当前时间的时间戳(以 64 位整数表示)。GetCurrentTimestamp 函数获取当前的时间戳,这个时间戳用于标识消息的发送时间。*/ pq_sendbyte(&output_message, requestReply ? 1 : 0); /*向 output_message 中添加一个字节。如果 requestReply 为 true,则添加字节 1;否则添加字节 0。这个字节表示是否请求对消息的回复。*/ pq_putmessage_noblock('d', output_message.data, output_message.len); /*通过 pq_putmessage_noblock 函数发送消息。消息类型为 'd'(复制数据消息),output_message.data 是消息的实际内容,output_message.len 是消息的长度。pq_putmessage_noblock 函数以非阻塞模式将消息发送出去。*/ /* Set local flag */ if (requestReply) waiting_for_ping_response = true; /*用于指示系统正在等待对keeplalive消息的回复。*/ }

5.WalSndWaitForWal(XLogRecPtr loc):
 描述: 等待新的WAL记录。
 主要步骤:
 (1)检查是否有未发送的WAL记录:
如果当前没有新的WAL记录,则进入等待状态。
 (2)进入等待状态:
调用WalSndWait等待新的WAL记录生成。
核心代码:

WalSndWaitForWal(XLogRecPtr loc) { int wakeEvents; static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr; /*跟踪最近的 WAL 刷新位置,初始值为 InvalidXLogRecPtr(无效位置)。*/ if (RecentFlushPtr != InvalidXLogRecPtr && loc <= RecentFlushPtr) return RecentFlushPtr; /*检查 RecentFlushPtr 是否有效并且 loc 是否小于等于 RecentFlushPtr。如果条件满足,直接返回 RecentFlushPtr,表示已经有足够的 WAL 数据。*/ if (!RecoveryInProgress()) RecentFlushPtr = GetFlushRecPtr(NULL); else RecentFlushPtr = GetXLogReplayRecPtr(NULL); /*如果当前不在恢复模式下,则调用 GetFlushRecPtr 获取最新的 WAL 刷新位置,并更新 RecentFlushPtr。如果在恢复模式下,则调用 GetXLogReplayRecPtr 来获取最新的 WAL 重放位置。*/ for (;;) { long sleeptime; /*计算休眠时间*/ ResetLatch(MyLatch); /*重置 MyLatch,以清除任何已挂起的唤醒信号。*/ CHECK_FOR_INTERRUPTS(); /*检查是否有中断信号(如终止请求),以便可以响应这些信号。*/ if (ConfigReloadPending) { ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); SyncRepInitConfig(); } /*如果配置重载标志 ConfigReloadPending 被设置为 true,处理配置文件并重新初始化同步复制配置。*/ ProcessRepliesIfAny(); /*处理可能的客户端回复。*/ if (got_STOPPING) XLogBackgroundFlush(); /*如果系统收到停止信号,触发后台 WAL 刷新,以确保所有 WAL 数据被写入。*/ if (!RecoveryInProgress()) RecentFlushPtr = GetFlushRecPtr(NULL); else RecentFlushPtr = GetXLogReplayRecPtr(NULL); /*重新获取最新的 WAL 刷新位置。*/ if (got_STOPPING) break; /*如果收到停止信号,退出循环。*/ if (MyWalSnd->flush < sentPtr && MyWalSnd->write < sentPtr && !waiting_for_ping_response) WalSndKeepalive(false, InvalidXLogRecPtr); /*如果当前的 flush 和 write 位置都小于 sentPtr,并且没有在等待 ping 响应,发送一个 keepalive 消息以保持连接。*/ if (loc <= RecentFlushPtr) break; /*检查目标 WAL 位置 loc 是否小于等于 RecentFlushPtr,如果满足条件,退出循环。*/ WalSndCaughtUp = true;/*设置 WalSndCaughtUp 标志为 true,表示当前已跟上 WAL 发送的进度。*/ if (pq_flush_if_writable() != 0) WalSndShutdown(); /*尝试刷新所有可写的输出缓冲区。如果刷新失败,调用 WalSndShutdown 进行关闭处理。*/ if (streamingDoneReceiving && streamingDoneSending && !pq_is_send_pending()) break; /*如果已完成数据接收和发送,并且没有待发送的输出数据,退出循环。*/ WalSndCheckTimeOut();/*检查是否超时,如果超时则执行相关处理。*/ WalSndKeepaliveIfNecessary(); /*如果需要,发送 keepalive 消息以保持连接。*/ sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); /*计算下次唤醒的时间。*/ wakeEvents = WL_SOCKET_READABLE; /*初始化 wakeEvents,表示等待 socket 可读事件。*/ if (pq_is_send_pending()) wakeEvents |= WL_SOCKET_WRITEABLE; /*如果有待发送的输出数据,将 wakeEvents 更新为等待 socket 可写事件。*/ WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL);/*调用 WalSndWait 函数,等待发生的事件或超时。 */ } SetLatch(MyLatch); return RecentFlushPtr; /*循环结束后,重新激活 MyLatch 以便 WalSndLoop 知道需要继续运行,返回 RecentFlushPtr 作为结果。*/ }

6.LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time):
 描述: 记录备节点的滞后信息。
 主要步骤:
 (1)记录当前WAL位置及时间戳:
将当前WAL的位置及时间戳记录到滞后追踪器中。
 (2)更新滞后追踪信息:
调用LagTrackerRead读取并更新滞后信息。

3.2 从服务器端

1.WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI):
 描述: 确定从哪里开始接收WAL数据。
 主要步骤:
 (1)读取开始位置及时间线ID:
从控制文件或其他持久化存储中读取开始位置和时间线ID。
 (2)设置初始WAL接收位置:
设置初始的WAL接收位置及时间线ID。
核心源码:

static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI) { WalRcvData *walrcv = WalRcv; /*WalRcv 是指向 WAL 发送器数据结构的指针。*/ int state; SpinLockAcquire(&walrcv->mutex); /*使用 SpinLockAcquire 获取对 walrcv 数据的自旋锁,以保护对共享数据的访问。*/ state = walrcv->walRcvState; /*读取 WAL 发送器的状态 state。*/ if (state != WALRCV_STREAMING) /*如果状态不是 WALRCV_STREAMING,释放锁并检查状态: */ { SpinLockRelease(&walrcv->mutex); if (state == WALRCV_STOPPING) proc_exit(0); else elog(FATAL, "unexpected walreceiver state"); } walrcv->walRcvState = WALRCV_WAITING; /*将 WAL 发送器的状态设置为 WALRCV_WAITING,表示它在等待新的指令。*/ walrcv->receiveStart = InvalidXLogRecPtr; walrcv->receiveStartTLI = 0; /*将 receiveStart 和 receiveStartTLI 设置为无效值,表示尚未指定开始位置。*/ SpinLockRelease(&walrcv->mutex); /*释放自旋锁。*/ set_ps_display("idle"); /*调用 set_ps_display("idle") 更新进程标题显示为“idle”。*/ WakeupRecovery(); /*调用 WakeupRecovery() 通知恢复进程,WAL 发送器已停止流式传输,现在正在等待新的指令。*/ for (;;) { ResetLatch(MyLatch); /*使用 ResetLatch(MyLatch) 重置当前的自旋锁,以准备处理新的事件。*/ ProcessWalRcvInterrupts(); /*调用 ProcessWalRcvInterrupts() 处理 WAL 发送器的中断。*/ SpinLockAcquire(&walrcv->mutex); /*获取自旋锁保护对 walrcv 状态的访问。*/ Assert(walrcv->walRcvState == WALRCV_RESTARTING || walrcv->walRcvState == WALRCV_WAITING || walrcv->walRcvState == WALRCV_STOPPING); if (walrcv->walRcvState == WALRCV_RESTARTING) { /*确保 WAL 发送器的状态为 WALRCV_RESTARTING、WALRCV_WAITING 或 WALRCV_STOPPING。 如果状态为 WALRCV_RESTARTING,更新 startpoint 和 startpointTLI,然后将状态设置为 WALRCV_STREAMING,释放自旋锁并跳出循环。 如果状态为 WALRCV_STOPPING,释放自旋锁并调用 exit(1) 终止进程。*/ *startpoint = walrcv->receiveStart; *startpointTLI = walrcv->receiveStartTLI; walrcv->walRcvState = WALRCV_STREAMING; SpinLockRelease(&walrcv->mutex); break; } if (walrcv->walRcvState == WALRCV_STOPPING) { /*如果状态不是上述值,释放自旋锁并调用 WaitLatch() 进入等待状态,直到被唤醒或进程终止。*/ SpinLockRelease(&walrcv->mutex); exit(1); } SpinLockRelease(&walrcv->mutex); (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0, WAIT_EVENT_WAL_RECEIVER_WAIT_START); } if (update_process_title) { /*如果 update_process_title 为真,更新进程标题以反映当前正在重启的位置,显示为“restarting at XLogRecPtr”格式。*/ char activitymsg[50]; snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X", LSN_FORMAT_ARGS(*startpoint)); set_ps_display(activitymsg); } }

2.XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli):
 描述: 处理来自walsender的消息。
 主要步骤:
 (1)根据消息类型处理:
如果消息类型是WAL数据,调用XLogWalRcvWrite。
如果消息类型是心跳或反馈,更新本地状态。

static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli) { int hdrlen; /*消息头的长度*/ XLogRecPtr dataStart; /*数据开始位置*/ XLogRecPtr walEnd; /*WAL 结束位置*/ TimestampTz sendTime; /*发送时间*/ bool replyRequested; /*是否请求回复*/ resetStringInfo(&incoming_message); /*清空 incoming_message 结构体,用于存储接收到的消息数据。*/ switch (type) { /*计算 WAL 记录消息头的长度(3 个 int64),并检查接收消息的长度是否足够。 如果消息长度不够,抛出协议违规错误。 将消息头部分复制到 incoming_message 结构体中。*/ case 'w': /*根据消息类型进行处理,'w' 表示 WAL 记录消息。*/ { /* copy message to StringInfo */ hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64); if (len < hdrlen) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid WAL message received from primary"))); appendBinaryStringInfo(&incoming_message, buf, hdrlen); /* read the fields */ dataStart = pq_getmsgint64(&incoming_message); /*从 incoming_message 结构体中读取 dataStart、walEnd 和 sendTime。*/ walEnd = pq_getmsgint64(&incoming_message); sendTime = pq_getmsgint64(&incoming_message); ProcessWalSndrMessage(walEnd, sendTime); /*调用 ProcessWalSndrMessage 函数处理 WAL 发送器消息,传入 walEnd 和 sendTime。*/ buf += hdrlen; len -= hdrlen; /*更新 buf 和 len,跳过消息头部分。*/ XLogWalRcvWrite(buf, len, dataStart, tli); /*调用 XLogWalRcvWrite 函数写入 WAL 数据。*/ break; } case 'k': /* 处理 case 'k',表示心跳消息。*/ { /* 计算心跳消息头的长度(1 个 int64 + 1 个 char),并检查消息长度是否匹配。 如果消息长度不匹配,抛出协议违规错误。 将消息头部分复制到 incoming_message 结构体中。 */ hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char); if (len != hdrlen) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid keepalive message received from primary"))); appendBinaryStringInfo(&incoming_message, buf, hdrlen); /*从 incoming_message 结构体中读取 walEnd、sendTime 和 replyRequested。 调用 ProcessWalSndrMessage 处理 WAL 发送器消息。 */ walEnd = pq_getmsgint64(&incoming_message); sendTime = pq_getmsgint64(&incoming_message); replyRequested = pq_getmsgbyte(&incoming_message); ProcessWalSndrMessage(walEnd, sendTime); /* If the primary requested a reply, send one immediately */ if (replyRequested) XLogWalRcvSendReply(true, false); break; } default: /*如果心跳消息中请求了回复,则调用 XLogWalRcvSendReply 发送回复。 处理完 case 'k' 的情况后退出 switch 语句。*/ ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid replication message type %d", type))); } }

3.XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli):
 描述: 将接收到的WAL数据写入本地。
 主要步骤:
 (1)写入WAL记录到本地:
将接收的WAL数据写入本地WAL文件。
 (2)更新写入进度:
调用XLogWalRcvFlush确保数据持久化。
核心代码:

static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli) { int startoff; int byteswritten; /*startoff 用于记录日志的起始偏移量,byteswritten 用于记录实际写入的字节数。*/ Assert(tli != 0); while (nbytes > 0) /*还有字节需要写入时,进入循环。*/ { int segbytes; /*算在当前日志段中要写入的字节数。*/ if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size)) XLogWalRcvClose(recptr, tli); /*如果当前文件 recvFile 是有效的,并且 recptr 指针超出了当前日志段的范围,则调用 XLogWalRcvClose 关闭当前段。*/ if (recvFile < 0) { /*如果当前没有有效的接收文件,则需要创建一个新的日志文件。通过 XLByteToSeg 计算日志段号,并调用 XLogFileInit 初始化新文件,同时更新 recvFileTLI 为当前时间线 ID。*/ XLByteToSeg(recptr, recvSegNo, wal_segment_size); recvFile = XLogFileInit(recvSegNo, tli); recvFileTLI = tli; } /*计算接收日志的起始偏移量,XLogSegmentOffset 计算 recptr 在当前日志段中的偏移量。*/ startoff = XLogSegmentOffset(recptr, wal_segment_size); if (startoff + nbytes > wal_segment_size) segbytes = wal_segment_size - startoff; else segbytes = nbytes; /*确定在当前日志段中实际需要写入的字节数 segbytes。如果要写入的数据超出了当前段的范围,则限制写入到段的结尾。*/ errno = 0; byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff); if (byteswritten <= 0) { char xlogfname[MAXFNAMELEN]; int save_errno; if (errno == 0) errno = ENOSPC; save_errno = errno; XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size); errno = save_errno; ereport(PANIC, (errcode_for_file_access(), errmsg("could not write to log segment %s " "at offset %u, length %lu: %m", xlogfname, startoff, (unsigned long) segbytes))); } /*通过 pg_pwrite 写入日志数据到文件。如果写入失败,检查 errno 是否为零,如果是,则设置为 ENOSPC(磁盘空间不足)。然后生成日志文件名,并报告错误信息并中止程序。*/ recptr += byteswritten; nbytes -= byteswritten; buf += byteswritten; LogstreamResult.Write = recptr; /*更新写入的状态,增加 recptr 以反映实际写入的字节数。减少剩余要写入的字节数 nbytes,同时更新缓冲区 buf 的指针。更新 LogstreamResult.Write 记录写入进度。*/ } pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write);/*使用原子操作更新共享内存中的 WalRcv->writtenUpto,以反映最新的写入位置。*/ if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size)) XLogWalRcvClose(recptr, tli);/*如果在循环的最后一次迭代中当前段已经完全写入,并且 recptr 指针超出了当前段的范围,则关闭当前日志段。否则,WAL 归档将被延迟,直到接收到并写入下一个段的数据。*/ }

4.XLogWalRcvFlush(bool dying, TimeLineID tli):
 描述: 确保WAL数据持久化。
主要步骤:
 (1)刷新WAL数据到磁盘:
将缓冲区中的WAL数据刷新到磁盘,确保其持久化。
 (2)更新本地持久化进度:
更新本地的WAL持久化进度,以便在故障恢复时能正确继续。
核心代码:

static void XLogWalRcvFlush(bool dying, TimeLineID tli) { Assert(tli != 0);/*确保时间线 ID (tli) 不为零。如果 tli 为零,则触发断言失败,说明代码逻辑可能出现了问题。*/ if (LogstreamResult.Flush < LogstreamResult.Write)/*检查是否需要刷新。LogstreamResult.Flush 表示上次已刷新到的位置,而 LogstreamResult.Write 表示当前写入的位置。如果 Flush 小于 Write,说明需要进行刷新操作。*/ { WalRcvData *walrcv = WalRcv;/*获取 WAL 接收器的共享数据结构 WalRcv 的指针,方便后续操作。*/ issue_xlog_fsync(recvFile, recvSegNo, tli); /*调用 issue_xlog_fsync 函数,对当前接收的 WAL 文件进行同步(即将数据写入磁盘)。recvFile 是接收的文件,recvSegNo 是段号,tli 是时间线 ID。*/ LogstreamResult.Flush = LogstreamResult.Write; /*更新 LogstreamResult.Flush 为当前写入的位置。这意味着所有从 Flush 到 Write 的数据都已经被刷新到磁盘上。*/ SpinLockAcquire(&walrcv->mutex); /*获取 walrcv 的自旋锁,保护对共享内存的访问。*/ if (walrcv->flushedUpto < LogstreamResult.Flush) { /*检查是否需要更新 flushedUpto,即上次刷新位置是否小于当前的位置。 walrcv->latestChunkStart = walrcv->flushedUpto;: 记录最新的块起始位置。 walrcv->flushedUpto = LogstreamResult.Flush;: 更新 flushedUpto 为最新的刷新位置。 walrcv->receivedTLI = tli;: 更新 receivedTLI 为当前的时间线 ID。 */ walrcv->latestChunkStart = walrcv->flushedUpto; walrcv->flushedUpto = LogstreamResult.Flush; walrcv->receivedTLI = tli; } SpinLockRelease(&walrcv->mutex); /*释放自旋锁。*/ WakeupRecovery(); /*向恢复进程发出信号,表示有新的 WAL 到达。*/ if (AllowCascadeReplication()) /*检查是否允许级联复制。*/ WalSndWakeup(); /* 向 WAL 发送器发出信号,表示有新的 WAL 到达。*/ if (update_process_title) /*如果需要更新进程标题,进行以下操作。*/ { char activitymsg[50]; /*定义一个字符数组,用于存储活动信息。*/ snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X", /*格式化字符串,生成进程标题信息。LSN_FORMAT_ARGS 是一个宏,格式化写入的位置。*/ LSN_FORMAT_ARGS(LogstreamResult.Write)); set_ps_display(activitymsg); /*更新进程标题为当前的活动信息。*/ } if (!dying) /*如果接收器没有在关闭中,进行以下操作。*/ { XLogWalRcvSendReply(false, false); /*向主节点发送回复,表示接收器已经处理了一些 WAL。*/ XLogWalRcvSendHSFeedback(false); /*向主节点发送心跳反馈。*/ } } }

5.XLogWalRcvSendReply(bool force, bool requestReply):
 描述: 向walsender发送回复。
 主要步骤:
 (1)构建回复消息:
构建包含当前接收的WAL位置等信息的回复消息。
 (2)发送回复消息:
通过网络将回复消息发送给walsender。
核心代码:

static void XLogWalRcvSendReply(bool force, bool requestReply) { static XLogRecPtr writePtr = 0; static XLogRecPtr flushPtr = 0; /*用于存储上一次发送的写入和刷新位置。*/ XLogRecPtr applyPtr; /* 用于存储应用位置(将被实时计算)。*/ static TimestampTz sendTime = 0; /*用于记录上一次发送消息的时间。*/ TimestampTz now; /*用于存储当前的时间戳。*/ if (!force && wal_receiver_status_interval <= 0) /*如果 force 为 false 且 wal_receiver_status_interval 小于或等于 0,则不发送状态消息,直接返回。*/ return; now = GetCurrentTimestamp(); /*获取当前的时间戳,并将其存储在 now 中。*/ if (!force && writePtr == LogstreamResult.Write && flushPtr == LogstreamResult.Flush && !TimestampDifferenceExceeds(sendTime, now, wal_receiver_status_interval * 1000)) return; /*如果 force 为 false 且当前的 writePtr 和 flushPtr 没有变化,并且距离上次发送的时间小于 wal_receiver_status_interval 的限制,则不需要重新发送状态消息,直接返回。*/ sendTime = now; /*更新 sendTime 为当前时间,准备下一次发送消息。*/ writePtr = LogstreamResult.Write; flushPtr = LogstreamResult.Flush; applyPtr = GetXLogReplayRecPtr(NULL); /*更新 writePtr 和 flushPtr 为当前日志流的写入和刷新位置。获取 applyPtr,即 WAL 的应用位置。*/ resetStringInfo(&reply_message); /*先重置 reply_message。*/ pq_sendbyte(&reply_message, 'r'); /*使用 pq_sendbyte 发送一个标识字符 'r'。*/ pq_sendint64(&reply_message, writePtr); /*使用 pq_sendint64 发送 writePtr、flushPtr、applyPtr 和当前时间戳。*/ pq_sendint64(&reply_message, flushPtr); /*使用 pq_sendbyte 发送 requestReply 标志,决定是否需要回应。*/ pq_sendint64(&reply_message, applyPtr); pq_sendint64(&reply_message, GetCurrentTimestamp()); pq_sendbyte(&reply_message, requestReply ? 1 : 0); /* Send it */ elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s", /*记录调试日志,显示发送的写入、刷新和应用位置,及是否请求回应的状态。*/ LSN_FORMAT_ARGS(writePtr), LSN_FORMAT_ARGS(flushPtr), LSN_FORMAT_ARGS(applyPtr), requestReply ? " (reply requested)" : ""); walrcv_send(wrconn, reply_message.data, reply_message.len); /*通过 walrcv_send 函数发送构建好的消息 reply_message 到 wrconn 连接。*/ }

6.XLogWalRcvSendHSFeedback(bool immed):
 描述: 发送热备反馈信息。
 主要步骤:
 (1)构建热备反馈消息:
构建包含备节点活动事务信息的热备反馈消息。
 (2)发送热备反馈消息:
通过网络将热备反馈消息发送给walsender。

7.ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime):
 描述: 处理来自主节点的WAL结束信息。
 主要步骤:
 (1)解析WAL结束信息:
解析从主节点接收到的WAL结束位置和发送时间信息。
 (2)更新WAL接收进度:
更新本地的WAL接收进度,以确保数据同步。
核心代码:

static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) { WalRcvData *walrcv = WalRcv; TimestampTz lastMsgReceiptTime = GetCurrentTimestamp(); /*创建了一个TimestampTz类型的变量lastMsgReceiptTime,并为其赋值为调用函数GetCurrentTimestamp()返回的当前时间戳。*/ SpinLockAcquire(&walrcv->mutex); /*这部分代码用于获取WalRcvData结构体中名为mutex的互斥锁。在更新共享内存中的状态之前,需要先获取这个锁。*/ if (walrcv->latestWalEnd < walEnd) /*检查当前共享内存中的最新WAL结束位置是否小于传递进来的walEnd参数。*/ walrcv->latestWalEndTime = sendTime; /*如果上面的条件为真,则将sendTime赋值给WalRcvData结构体中名为latestWalEndTime的字段。*/ walrcv->latestWalEnd = walEnd; /*walEnd赋值给WalRcvData结构体中名为latestWalEnd的字段。 walrcv->lastMsgSendTime = sendTime; /*将sendTime赋值给WalRcvData结构体中名为lastMsgSendTime的字段。*/ walrcv->lastMsgReceiptTime = lastMsgReceiptTime; /*将之前获取的当前时间戳lastMsgReceiptTime赋值给WalRcvData结构体中名为lastMsgReceiptTime的字段。*/ SpinLockRelease(&walrcv->mutex); /*这部分代码用于释放之前获取的WalRcvData结构体中名为mutex的互斥锁。*/ if (message_level_is_interesting(DEBUG2)) { char *sendtime; char *receipttime; int applyDelay; /* Copy because timestamptz_to_str returns a static buffer */ sendtime = pstrdup(timestamptz_to_str(sendTime)); receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime)); /*将lastMsgReceiptTime转换为字符串并复制。*/ applyDelay = GetReplicationApplyDelay(); /*调用GetReplicationApplyDelay函数,该函数用于获取复制应用延迟的值。*/ /* apply delay is not available */ if (applyDelay == -1) elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms", sendtime, receipttime, GetReplicationTransferLatency()); else elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms", sendtime, receipttime, applyDelay, GetReplicationTransferLatency()); pfree(sendtime); pfree(receipttime); } }

作者介绍

张杰,移动云数据库工程师,负责云原生数据库He3DB的研发。

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论