暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

He3DB源码解读:主备流复制建立流程中的跨时间线场景

原创 借个背包去旅行 2023-11-06
154

He3DB 是由移动云数据库团队研发的一款计算/存储分离的云原生数据库,He3DB通过计算/存储分离、数据冷热分层和压缩、智能中间件等技术,来保证高性能和低成本完美兼得,在获得高性能的同时,最大化的帮助客户节省数据库使用成本。

场景描述:

  1. master、slave1、slave2
  2. kill -9 slave2
  3. master do some insert
  4. slave1 promote
  5. slave2 connect to slave1

本场景涉及到startup进程和walreceiver,主要涉及的函数有WaitForWALToBecomeAvailable和WalReceiverMain。WaitForWALToBecomeAvailable函数用于打开包含指定LSN wal日志的segment文件,如果处于备机模式,将会一直等待新日志同步。在这个过程中需要实现日志源切换、时间线选取、流复制建立、walreceiver进程消息监听。WalReceiverMain负责与主机通信获取指定wal日志。

下面详细说明第五步slave2连接到新主slave1期间的主备流复制建立过程。

Stage 1

slave2首先进入常规crash recovery流程从pg_wal和archive读取日志进行repaly,currentSource==XLOG_FROM_ARCHIVE

xlog.c 从pg_wal和archive读取日志

switch (currentSource)

{

case XLOG_FROM_ARCHIVE:

case XLOG_FROM_PG_WAL:

 

/*

* WAL receiver must not be running when reading WAL from

* archive or pg_wal.

*/

Assert(!WalRcvStreaming());

 

/* Close any old file we might have open. */

if (readFile >= 0)

{

close(readFile);

readFile = -1;

}

/* Reset curFileTLI if random fetch. */

if (randAccess)

curFileTLI = 0;

 

/*

* Try to restore the file from archive, or read an existing

* file from pg_wal.

*/

readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2,

currentSource == XLOG_FROM_ARCHIVE ? XLOG_FROM_ANY :

currentSource);

if (readFile >= 0)

return true; /* success! */

 

/*

* Nope, not found in archive or pg_wal.

*/

lastSourceFailed = true;

break; 

Stage 2

当从pg_wal和archive读取完所有日志之后,切换日志源currentSource == XLOG_FROM_STREAM,并根据请求日志的起始位置来获取请求的时间线。

xlog.c 切换日志源

if (lastSourceFailed)

{

switch (currentSource)

{

case XLOG_FROM_ARCHIVE:

case XLOG_FROM_PG_WAL:

 

/*

* Check to see if the trigger file exists. Note that we

* do this only after failure, so when you create the

* trigger file, we still finish replaying as much as we

* can from archive and pg_wal before failover.

*/

if (StandbyMode && CheckForStandbyTrigger())

{

ShutdownWalRcv();

return false;

}

 

/*

* Not in standby mode, and we've now tried the archive

* and pg_wal.

*/

if (!StandbyMode)

return false;

 

/*

* Move to XLOG_FROM_STREAM state, and set to start a

* walreceiver if necessary.

*/

currentSource = XLOG_FROM_STREAM;

startWalReceiver = true;

break;

xlog.c 启动walreceiver 此时时间线tli=1

case XLOG_FROM_STREAM:

{

bool havedata;

 

/*

* We should be able to move to XLOG_FROM_STREAM only in

* standby mode.

*/

Assert(StandbyMode);

 

/*

* First, shutdown walreceiver if its restart has been

* requested -- but no point if we're already slated for

* starting it.

*/

if (pendingWalRcvRestart && !startWalReceiver)

{

ShutdownWalRcv();

 

/*

* Re-scan for possible new timelines if we were

* requested to recover to the latest timeline.

*/

if (recoveryTargetTimeLineGoal ==

RECOVERY_TARGET_TIMELINE_LATEST)

rescanLatestTimeLine();

 

startWalReceiver = true;

}

pendingWalRcvRestart = false;

 

/*

* Launch walreceiver if needed.

*

* If fetching_ckpt is true, RecPtr points to the initial

* checkpoint location. In that case, we use RedoStartLSN

* as the streaming start position instead of RecPtr, so

* that when we later jump backwards to start redo at

* RedoStartLSN, we will have the logs streamed already.

*/

if (startWalReceiver &&

PrimaryConnInfo && strcmp(PrimaryConnInfo, "") != 0)

{

XLogRecPtr ptr;

TimeLineID tli;

 

if (fetching_ckpt)

{

ptr = RedoStartLSN;

tli = ControlFile->checkPointCopy.ThisTimeLineID;

}

else

{

ptr = RecPtr;

 

/*

* Use the record begin position to determine the

* TLI, rather than the position we're reading.

*/

tli = tliOfPointInHistory(tliRecPtr, expectedTLEs);

 

if (curFileTLI > 0 && tli < curFileTLI)

elog(ERROR, "according to history file, WAL location %X/%X belongs to timeline %u, but previous recovered WAL file came from timeline %u",

LSN_FORMAT_ARGS(tliRecPtr),

tli, curFileTLI);

}

curFileTLI = tli;

RequestXLogStreaming(tli, ptr, PrimaryConnInfo,

PrimarySlotName,

wal_receiver_create_temp_slot);

flushedUpto = 0;

}

walreceiver.c 建立时间线为1的主备流复制(if (walrcv_startstreaming(wrconn, &options))),当时间线为1的日志传输结束后,主将终止此时间线为1的流复制(len = walrcv_receive(wrconn, &buf, &wait_fd);if (len < 0)),walreceiver进入WalRcvWaitForStartPosition函数,此函数将通知WaitForWALToBecomeAvailable本次流复制结束,等待初始化下一次主备流复制任务

if (walrcv_startstreaming(wrconn, &options))

{

if (first_stream)

ereport(LOG,

(errmsg("started streaming WAL from primary at %X/%X on timeline %u",

LSN_FORMAT_ARGS(startpoint), startpointTLI)));

else

ereport(LOG,

(errmsg("restarted WAL streaming at %X/%X on timeline %u",

LSN_FORMAT_ARGS(startpoint), startpointTLI)));

first_stream = false;

 

/* Initialize LogstreamResult and buffers for processing messages */

LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);

initStringInfo(&reply_message);

initStringInfo(&incoming_message);

 

/* Initialize the last recv timestamp */

last_recv_timestamp = GetCurrentTimestamp();

ping_sent = false;

 

/* Loop until end-of-streaming or error */

for (;;)

{

char *buf;

int len;

bool endofwal = false;

pgsocket wait_fd = PGINVALID_SOCKET;

int rc;

 

/*

* Exit walreceiver if we're not in recovery. This should not

* happen, but cross-check the status here.

*/

if (!RecoveryInProgress())

ereport(FATAL,

(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

errmsg("cannot continue WAL streaming, recovery has already ended")));

 

/* Process any requests or signals received recently */

ProcessWalRcvInterrupts();

 

if (ConfigReloadPending)

{

ConfigReloadPending = false;

ProcessConfigFile(PGC_SIGHUP);

XLogWalRcvSendHSFeedback(true);

}

 

/* See if we can read data immediately */

len = walrcv_receive(wrconn, &buf, &wait_fd);

if (len != 0)

{

/*

* Process the received data, and any subsequent data we

* can read without blocking.

*/

for (;;)

{

if (len > 0)

{

/*

* Something was received from primary, so reset

* timeout

*/

last_recv_timestamp = GetCurrentTimestamp();

ping_sent = false;

XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);

}

else if (len == 0)

break;

else if (len < 0)

{

ereport(LOG,

(errmsg("replication terminated by primary server"),

errdetail("End of WAL reached on timeline %u at %X/%X.",

startpointTLI,

LSN_FORMAT_ARGS(LogstreamResult.Write))));

endofwal = true;

break;

}

len = walrcv_receive(wrconn, &buf, &wait_fd);

}

 

/* Let the primary know that we received some data. */

XLogWalRcvSendReply(false, false);

 

/*

* If we've written some records, flush them to disk and

* let the startup process and primary server know about

* them.

*/

XLogWalRcvFlush(false);

}

 

/* Check if we need to exit the streaming loop. */

if (endofwal)

break;

。。。

elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");

WalRcvWaitForStartPosition(&startpoint, &startpointTLI);

Stage 3

WaitForWALToBecomeAvailable检测到主备流复制关闭之后,重新扫描时间线(rescanLatestTimeLine函数),更新时间线,启动建立时间线为2的流复制(建立过程同stage2)。walreceiver进程接收到请求之后,重置receiveStart 和 receiveStartTLI,并发起时间线为2的主备流复制。

xlog.c

static bool

rescanLatestTimeLine(void)

{

List *newExpectedTLEs;

bool found;

ListCell *cell;

TimeLineID newtarget;

TimeLineID oldtarget = recoveryTargetTLI;

TimeLineHistoryEntry *currentTle = NULL;

 

newtarget = findNewestTimeLine(recoveryTargetTLI);

if (newtarget == recoveryTargetTLI)

{

/* No new timelines found */

return false;

}

 

/*

* Determine the list of expected TLIs for the new TLI

*/

 

newExpectedTLEs = readTimeLineHistory(newtarget);

 

/*

* If the current timeline is not part of the history of the new timeline,

* we cannot proceed to it.

*/

found = false;

foreach(cell, newExpectedTLEs)

{

currentTle = (TimeLineHistoryEntry *) lfirst(cell);

 

if (currentTle->tli == recoveryTargetTLI)

{

found = true;

break;

}

}

if (!found)

{

ereport(LOG,

(errmsg("new timeline %u is not a child of database system timeline %u",

newtarget,

ThisTimeLineID)));

return false;

}

 

/*

* The current timeline was found in the history file, but check that the

* next timeline was forked off from it *after* the current recovery

* location.

*/

if (currentTle->end < EndRecPtr)

{

ereport(LOG,

(errmsg("new timeline %u forked off current database system timeline %u before current recovery point %X/%X",

newtarget,

ThisTimeLineID,

LSN_FORMAT_ARGS(EndRecPtr))));

return false;

}

 

/* The new timeline history seems valid. Switch target */

recoveryTargetTLI = newtarget;

list_free_deep(expectedTLEs);

expectedTLEs = newExpectedTLEs;

 

/*

* As in StartupXLOG(), try to ensure we have all the history files

* between the old target and new target in pg_wal.

*/

restoreTimeLineHistoryFiles(oldtarget + 1, newtarget);

 

ereport(LOG,

(errmsg("new target timeline is %u",

recoveryTargetTLI)));

 

return true;

}

从日志角度看启动过程如下:

 

He3DB 是由移动云数据库团队研发的一款计算/存储分离的云原生数据库,He3DB通过计算/存储分离、数据冷热分层和压缩、智能中间件等技术,来保证高性能和低成本完美兼得,在获得高性能的同时,最大化的帮助客户节省数据库使用成本。

场景描述:

  1. master、slave1、slave2
  2. kill -9 slave2
  3. master do some insert
  4. slave1 promote
  5. slave2 connect to slave1

本场景涉及到startup进程和walreceiver,主要涉及的函数有WaitForWALToBecomeAvailable和WalReceiverMain。WaitForWALToBecomeAvailable函数用于打开包含指定LSN wal日志的segment文件,如果处于备机模式,将会一直等待新日志同步。在这个过程中需要实现日志源切换、时间线选取、流复制建立、walreceiver进程消息监听。WalReceiverMain负责与主机通信获取指定wal日志。

下面详细说明第五步slave2连接到新主slave1期间的主备流复制建立过程。

Stage 1

slave2首先进入常规crash recovery流程从pg_wal和archive读取日志进行repaly,currentSource==XLOG_FROM_ARCHIVE

xlog.c 从pg_wal和archive读取日志

switch (currentSource)

{

case XLOG_FROM_ARCHIVE:

case XLOG_FROM_PG_WAL:

 

/*

* WAL receiver must not be running when reading WAL from

* archive or pg_wal.

*/

Assert(!WalRcvStreaming());

 

/* Close any old file we might have open. */

if (readFile >= 0)

{

close(readFile);

readFile = -1;

}

/* Reset curFileTLI if random fetch. */

if (randAccess)

curFileTLI = 0;

 

/*

* Try to restore the file from archive, or read an existing

* file from pg_wal.

*/

readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2,

currentSource == XLOG_FROM_ARCHIVE ? XLOG_FROM_ANY :

currentSource);

if (readFile >= 0)

return true; /* success! */

 

/*

* Nope, not found in archive or pg_wal.

*/

lastSourceFailed = true;

break; 

Stage 2

当从pg_wal和archive读取完所有日志之后,切换日志源currentSource == XLOG_FROM_STREAM,并根据请求日志的起始位置来获取请求的时间线。

xlog.c 切换日志源

if (lastSourceFailed)

{

switch (currentSource)

{

case XLOG_FROM_ARCHIVE:

case XLOG_FROM_PG_WAL:

 

/*

* Check to see if the trigger file exists. Note that we

* do this only after failure, so when you create the

* trigger file, we still finish replaying as much as we

* can from archive and pg_wal before failover.

*/

if (StandbyMode && CheckForStandbyTrigger())

{

ShutdownWalRcv();

return false;

}

 

/*

* Not in standby mode, and we've now tried the archive

* and pg_wal.

*/

if (!StandbyMode)

return false;

 

/*

* Move to XLOG_FROM_STREAM state, and set to start a

* walreceiver if necessary.

*/

currentSource = XLOG_FROM_STREAM;

startWalReceiver = true;

break;

xlog.c 启动walreceiver 此时时间线tli=1

case XLOG_FROM_STREAM:

{

bool havedata;

 

/*

* We should be able to move to XLOG_FROM_STREAM only in

* standby mode.

*/

Assert(StandbyMode);

 

/*

* First, shutdown walreceiver if its restart has been

* requested -- but no point if we're already slated for

* starting it.

*/

if (pendingWalRcvRestart && !startWalReceiver)

{

ShutdownWalRcv();

 

/*

* Re-scan for possible new timelines if we were

* requested to recover to the latest timeline.

*/

if (recoveryTargetTimeLineGoal ==

RECOVERY_TARGET_TIMELINE_LATEST)

rescanLatestTimeLine();

 

startWalReceiver = true;

}

pendingWalRcvRestart = false;

 

/*

* Launch walreceiver if needed.

*

* If fetching_ckpt is true, RecPtr points to the initial

* checkpoint location. In that case, we use RedoStartLSN

* as the streaming start position instead of RecPtr, so

* that when we later jump backwards to start redo at

* RedoStartLSN, we will have the logs streamed already.

*/

if (startWalReceiver &&

PrimaryConnInfo && strcmp(PrimaryConnInfo, "") != 0)

{

XLogRecPtr ptr;

TimeLineID tli;

 

if (fetching_ckpt)

{

ptr = RedoStartLSN;

tli = ControlFile->checkPointCopy.ThisTimeLineID;

}

else

{

ptr = RecPtr;

 

/*

* Use the record begin position to determine the

* TLI, rather than the position we're reading.

*/

tli = tliOfPointInHistory(tliRecPtr, expectedTLEs);

 

if (curFileTLI > 0 && tli < curFileTLI)

elog(ERROR, "according to history file, WAL location %X/%X belongs to timeline %u, but previous recovered WAL file came from timeline %u",

LSN_FORMAT_ARGS(tliRecPtr),

tli, curFileTLI);

}

curFileTLI = tli;

RequestXLogStreaming(tli, ptr, PrimaryConnInfo,

PrimarySlotName,

wal_receiver_create_temp_slot);

flushedUpto = 0;

}

walreceiver.c 建立时间线为1的主备流复制(if (walrcv_startstreaming(wrconn, &options))),当时间线为1的日志传输结束后,主将终止此时间线为1的流复制(len = walrcv_receive(wrconn, &buf, &wait_fd);if (len < 0)),walreceiver进入WalRcvWaitForStartPosition函数,此函数将通知WaitForWALToBecomeAvailable本次流复制结束,等待初始化下一次主备流复制任务

if (walrcv_startstreaming(wrconn, &options))

{

if (first_stream)

ereport(LOG,

(errmsg("started streaming WAL from primary at %X/%X on timeline %u",

LSN_FORMAT_ARGS(startpoint), startpointTLI)));

else

ereport(LOG,

(errmsg("restarted WAL streaming at %X/%X on timeline %u",

LSN_FORMAT_ARGS(startpoint), startpointTLI)));

first_stream = false;

 

/* Initialize LogstreamResult and buffers for processing messages */

LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);

initStringInfo(&reply_message);

initStringInfo(&incoming_message);

 

/* Initialize the last recv timestamp */

last_recv_timestamp = GetCurrentTimestamp();

ping_sent = false;

 

/* Loop until end-of-streaming or error */

for (;;)

{

char *buf;

int len;

bool endofwal = false;

pgsocket wait_fd = PGINVALID_SOCKET;

int rc;

 

/*

* Exit walreceiver if we're not in recovery. This should not

* happen, but cross-check the status here.

*/

if (!RecoveryInProgress())

ereport(FATAL,

(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

errmsg("cannot continue WAL streaming, recovery has already ended")));

 

/* Process any requests or signals received recently */

ProcessWalRcvInterrupts();

 

if (ConfigReloadPending)

{

ConfigReloadPending = false;

ProcessConfigFile(PGC_SIGHUP);

XLogWalRcvSendHSFeedback(true);

}

 

/* See if we can read data immediately */

len = walrcv_receive(wrconn, &buf, &wait_fd);

if (len != 0)

{

/*

* Process the received data, and any subsequent data we

* can read without blocking.

*/

for (;;)

{

if (len > 0)

{

/*

* Something was received from primary, so reset

* timeout

*/

last_recv_timestamp = GetCurrentTimestamp();

ping_sent = false;

XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);

}

else if (len == 0)

break;

else if (len < 0)

{

ereport(LOG,

(errmsg("replication terminated by primary server"),

errdetail("End of WAL reached on timeline %u at %X/%X.",

startpointTLI,

LSN_FORMAT_ARGS(LogstreamResult.Write))));

endofwal = true;

break;

}

len = walrcv_receive(wrconn, &buf, &wait_fd);

}

 

/* Let the primary know that we received some data. */

XLogWalRcvSendReply(false, false);

 

/*

* If we've written some records, flush them to disk and

* let the startup process and primary server know about

* them.

*/

XLogWalRcvFlush(false);

}

 

/* Check if we need to exit the streaming loop. */

if (endofwal)

break;

。。。

elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");

WalRcvWaitForStartPosition(&startpoint, &startpointTLI);

Stage 3

WaitForWALToBecomeAvailable检测到主备流复制关闭之后,重新扫描时间线(rescanLatestTimeLine函数),更新时间线,启动建立时间线为2的流复制(建立过程同stage2)。walreceiver进程接收到请求之后,重置receiveStart 和 receiveStartTLI,并发起时间线为2的主备流复制。

xlog.c

static bool

rescanLatestTimeLine(void)

{

List *newExpectedTLEs;

bool found;

ListCell *cell;

TimeLineID newtarget;

TimeLineID oldtarget = recoveryTargetTLI;

TimeLineHistoryEntry *currentTle = NULL;

 

newtarget = findNewestTimeLine(recoveryTargetTLI);

if (newtarget == recoveryTargetTLI)

{

/* No new timelines found */

return false;

}

 

/*

* Determine the list of expected TLIs for the new TLI

*/

 

newExpectedTLEs = readTimeLineHistory(newtarget);

 

/*

* If the current timeline is not part of the history of the new timeline,

* we cannot proceed to it.

*/

found = false;

foreach(cell, newExpectedTLEs)

{

currentTle = (TimeLineHistoryEntry *) lfirst(cell);

 

if (currentTle->tli == recoveryTargetTLI)

{

found = true;

break;

}

}

if (!found)

{

ereport(LOG,

(errmsg("new timeline %u is not a child of database system timeline %u",

newtarget,

ThisTimeLineID)));

return false;

}

 

/*

* The current timeline was found in the history file, but check that the

* next timeline was forked off from it *after* the current recovery

* location.

*/

if (currentTle->end < EndRecPtr)

{

ereport(LOG,

(errmsg("new timeline %u forked off current database system timeline %u before current recovery point %X/%X",

newtarget,

ThisTimeLineID,

LSN_FORMAT_ARGS(EndRecPtr))));

return false;

}

 

/* The new timeline history seems valid. Switch target */

recoveryTargetTLI = newtarget;

list_free_deep(expectedTLEs);

expectedTLEs = newExpectedTLEs;

 

/*

* As in StartupXLOG(), try to ensure we have all the history files

* between the old target and new target in pg_wal.

*/

restoreTimeLineHistoryFiles(oldtarget + 1, newtarget);

 

ereport(LOG,

(errmsg("new target timeline is %u",

recoveryTargetTLI)));

 

return true;

}

从日志角度看启动过程如下:

 

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论