He3DB 是由移动云数据库团队研发的一款计算/存储分离的云原生数据库,He3DB通过计算/存储分离、数据冷热分层和压缩、智能中间件等技术,来保证高性能和低成本完美兼得,在获得高性能的同时,最大化的帮助客户节省数据库使用成本。
场景描述:
- master、slave1、slave2
- kill -9 slave2
- master do some insert
- slave1 promote
- slave2 connect to slave1
本场景涉及到startup进程和walreceiver,主要涉及的函数有WaitForWALToBecomeAvailable和WalReceiverMain。WaitForWALToBecomeAvailable函数用于打开包含指定LSN wal日志的segment文件,如果处于备机模式,将会一直等待新日志同步。在这个过程中需要实现日志源切换、时间线选取、流复制建立、walreceiver进程消息监听。WalReceiverMain负责与主机通信获取指定wal日志。
下面详细说明第五步slave2连接到新主slave1期间的主备流复制建立过程。
Stage 1
slave2首先进入常规crash recovery流程从pg_wal和archive读取日志进行repaly,currentSource==XLOG_FROM_ARCHIVE
xlog.c 从pg_wal和archive读取日志
switch (currentSource)
{
case XLOG_FROM_ARCHIVE:
case XLOG_FROM_PG_WAL:
/*
* WAL receiver must not be running when reading WAL from
* archive or pg_wal.
*/
Assert(!WalRcvStreaming());
/* Close any old file we might have open. */
if (readFile >= 0)
{
close(readFile);
readFile = -1;
}
/* Reset curFileTLI if random fetch. */
if (randAccess)
curFileTLI = 0;
/*
* Try to restore the file from archive, or read an existing
* file from pg_wal.
*/
readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2,
currentSource == XLOG_FROM_ARCHIVE ? XLOG_FROM_ANY :
currentSource);
if (readFile >= 0)
return true; /* success! */
/*
* Nope, not found in archive or pg_wal.
*/
lastSourceFailed = true;
break;
Stage 2
当从pg_wal和archive读取完所有日志之后,切换日志源currentSource == XLOG_FROM_STREAM,并根据请求日志的起始位置来获取请求的时间线。
xlog.c 切换日志源
if (lastSourceFailed)
{
switch (currentSource)
{
case XLOG_FROM_ARCHIVE:
case XLOG_FROM_PG_WAL:
/*
* Check to see if the trigger file exists. Note that we
* do this only after failure, so when you create the
* trigger file, we still finish replaying as much as we
* can from archive and pg_wal before failover.
*/
if (StandbyMode && CheckForStandbyTrigger())
{
ShutdownWalRcv();
return false;
}
/*
* Not in standby mode, and we've now tried the archive
* and pg_wal.
*/
if (!StandbyMode)
return false;
/*
* Move to XLOG_FROM_STREAM state, and set to start a
* walreceiver if necessary.
*/
currentSource = XLOG_FROM_STREAM;
startWalReceiver = true;
break;
xlog.c 启动walreceiver 此时时间线tli=1
case XLOG_FROM_STREAM:
{
bool havedata;
/*
* We should be able to move to XLOG_FROM_STREAM only in
* standby mode.
*/
Assert(StandbyMode);
/*
* First, shutdown walreceiver if its restart has been
* requested -- but no point if we're already slated for
* starting it.
*/
if (pendingWalRcvRestart && !startWalReceiver)
{
ShutdownWalRcv();
/*
* Re-scan for possible new timelines if we were
* requested to recover to the latest timeline.
*/
if (recoveryTargetTimeLineGoal ==
RECOVERY_TARGET_TIMELINE_LATEST)
rescanLatestTimeLine();
startWalReceiver = true;
}
pendingWalRcvRestart = false;
/*
* Launch walreceiver if needed.
*
* If fetching_ckpt is true, RecPtr points to the initial
* checkpoint location. In that case, we use RedoStartLSN
* as the streaming start position instead of RecPtr, so
* that when we later jump backwards to start redo at
* RedoStartLSN, we will have the logs streamed already.
*/
if (startWalReceiver &&
PrimaryConnInfo && strcmp(PrimaryConnInfo, "") != 0)
{
XLogRecPtr ptr;
TimeLineID tli;
if (fetching_ckpt)
{
ptr = RedoStartLSN;
tli = ControlFile->checkPointCopy.ThisTimeLineID;
}
else
{
ptr = RecPtr;
/*
* Use the record begin position to determine the
* TLI, rather than the position we're reading.
*/
tli = tliOfPointInHistory(tliRecPtr, expectedTLEs);
if (curFileTLI > 0 && tli < curFileTLI)
elog(ERROR, "according to history file, WAL location %X/%X belongs to timeline %u, but previous recovered WAL file came from timeline %u",
LSN_FORMAT_ARGS(tliRecPtr),
tli, curFileTLI);
}
curFileTLI = tli;
RequestXLogStreaming(tli, ptr, PrimaryConnInfo,
PrimarySlotName,
wal_receiver_create_temp_slot);
flushedUpto = 0;
}
walreceiver.c 建立时间线为1的主备流复制(if (walrcv_startstreaming(wrconn, &options))),当时间线为1的日志传输结束后,主将终止此时间线为1的流复制(len = walrcv_receive(wrconn, &buf, &wait_fd);if (len < 0)),walreceiver进入WalRcvWaitForStartPosition函数,此函数将通知WaitForWALToBecomeAvailable本次流复制结束,等待初始化下一次主备流复制任务
if (walrcv_startstreaming(wrconn, &options))
{
if (first_stream)
ereport(LOG,
(errmsg("started streaming WAL from primary at %X/%X on timeline %u",
LSN_FORMAT_ARGS(startpoint), startpointTLI)));
else
ereport(LOG,
(errmsg("restarted WAL streaming at %X/%X on timeline %u",
LSN_FORMAT_ARGS(startpoint), startpointTLI)));
first_stream = false;
/* Initialize LogstreamResult and buffers for processing messages */
LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
initStringInfo(&reply_message);
initStringInfo(&incoming_message);
/* Initialize the last recv timestamp */
last_recv_timestamp = GetCurrentTimestamp();
ping_sent = false;
/* Loop until end-of-streaming or error */
for (;;)
{
char *buf;
int len;
bool endofwal = false;
pgsocket wait_fd = PGINVALID_SOCKET;
int rc;
/*
* Exit walreceiver if we're not in recovery. This should not
* happen, but cross-check the status here.
*/
if (!RecoveryInProgress())
ereport(FATAL,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot continue WAL streaming, recovery has already ended")));
/* Process any requests or signals received recently */
ProcessWalRcvInterrupts();
if (ConfigReloadPending)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
XLogWalRcvSendHSFeedback(true);
}
/* See if we can read data immediately */
len = walrcv_receive(wrconn, &buf, &wait_fd);
if (len != 0)
{
/*
* Process the received data, and any subsequent data we
* can read without blocking.
*/
for (;;)
{
if (len > 0)
{
/*
* Something was received from primary, so reset
* timeout
*/
last_recv_timestamp = GetCurrentTimestamp();
ping_sent = false;
XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
}
else if (len == 0)
break;
else if (len < 0)
{
ereport(LOG,
(errmsg("replication terminated by primary server"),
errdetail("End of WAL reached on timeline %u at %X/%X.",
startpointTLI,
LSN_FORMAT_ARGS(LogstreamResult.Write))));
endofwal = true;
break;
}
len = walrcv_receive(wrconn, &buf, &wait_fd);
}
/* Let the primary know that we received some data. */
XLogWalRcvSendReply(false, false);
/*
* If we've written some records, flush them to disk and
* let the startup process and primary server know about
* them.
*/
XLogWalRcvFlush(false);
}
/* Check if we need to exit the streaming loop. */
if (endofwal)
break;
。。。
elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
Stage 3
WaitForWALToBecomeAvailable检测到主备流复制关闭之后,重新扫描时间线(rescanLatestTimeLine函数),更新时间线,启动建立时间线为2的流复制(建立过程同stage2)。walreceiver进程接收到请求之后,重置receiveStart 和 receiveStartTLI,并发起时间线为2的主备流复制。
xlog.c
static bool
rescanLatestTimeLine(void)
{
List *newExpectedTLEs;
bool found;
ListCell *cell;
TimeLineID newtarget;
TimeLineID oldtarget = recoveryTargetTLI;
TimeLineHistoryEntry *currentTle = NULL;
newtarget = findNewestTimeLine(recoveryTargetTLI);
if (newtarget == recoveryTargetTLI)
{
/* No new timelines found */
return false;
}
/*
* Determine the list of expected TLIs for the new TLI
*/
newExpectedTLEs = readTimeLineHistory(newtarget);
/*
* If the current timeline is not part of the history of the new timeline,
* we cannot proceed to it.
*/
found = false;
foreach(cell, newExpectedTLEs)
{
currentTle = (TimeLineHistoryEntry *) lfirst(cell);
if (currentTle->tli == recoveryTargetTLI)
{
found = true;
break;
}
}
if (!found)
{
ereport(LOG,
(errmsg("new timeline %u is not a child of database system timeline %u",
newtarget,
ThisTimeLineID)));
return false;
}
/*
* The current timeline was found in the history file, but check that the
* next timeline was forked off from it *after* the current recovery
* location.
*/
if (currentTle->end < EndRecPtr)
{
ereport(LOG,
(errmsg("new timeline %u forked off current database system timeline %u before current recovery point %X/%X",
newtarget,
ThisTimeLineID,
LSN_FORMAT_ARGS(EndRecPtr))));
return false;
}
/* The new timeline history seems valid. Switch target */
recoveryTargetTLI = newtarget;
list_free_deep(expectedTLEs);
expectedTLEs = newExpectedTLEs;
/*
* As in StartupXLOG(), try to ensure we have all the history files
* between the old target and new target in pg_wal.
*/
restoreTimeLineHistoryFiles(oldtarget + 1, newtarget);
ereport(LOG,
(errmsg("new target timeline is %u",
recoveryTargetTLI)));
return true;
}
从日志角度看启动过程如下:
He3DB 是由移动云数据库团队研发的一款计算/存储分离的云原生数据库,He3DB通过计算/存储分离、数据冷热分层和压缩、智能中间件等技术,来保证高性能和低成本完美兼得,在获得高性能的同时,最大化的帮助客户节省数据库使用成本。
场景描述:
- master、slave1、slave2
- kill -9 slave2
- master do some insert
- slave1 promote
- slave2 connect to slave1
本场景涉及到startup进程和walreceiver,主要涉及的函数有WaitForWALToBecomeAvailable和WalReceiverMain。WaitForWALToBecomeAvailable函数用于打开包含指定LSN wal日志的segment文件,如果处于备机模式,将会一直等待新日志同步。在这个过程中需要实现日志源切换、时间线选取、流复制建立、walreceiver进程消息监听。WalReceiverMain负责与主机通信获取指定wal日志。
下面详细说明第五步slave2连接到新主slave1期间的主备流复制建立过程。
Stage 1
slave2首先进入常规crash recovery流程从pg_wal和archive读取日志进行repaly,currentSource==XLOG_FROM_ARCHIVE
xlog.c 从pg_wal和archive读取日志
switch (currentSource)
{
case XLOG_FROM_ARCHIVE:
case XLOG_FROM_PG_WAL:
/*
* WAL receiver must not be running when reading WAL from
* archive or pg_wal.
*/
Assert(!WalRcvStreaming());
/* Close any old file we might have open. */
if (readFile >= 0)
{
close(readFile);
readFile = -1;
}
/* Reset curFileTLI if random fetch. */
if (randAccess)
curFileTLI = 0;
/*
* Try to restore the file from archive, or read an existing
* file from pg_wal.
*/
readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2,
currentSource == XLOG_FROM_ARCHIVE ? XLOG_FROM_ANY :
currentSource);
if (readFile >= 0)
return true; /* success! */
/*
* Nope, not found in archive or pg_wal.
*/
lastSourceFailed = true;
break;
Stage 2
当从pg_wal和archive读取完所有日志之后,切换日志源currentSource == XLOG_FROM_STREAM,并根据请求日志的起始位置来获取请求的时间线。
xlog.c 切换日志源
if (lastSourceFailed)
{
switch (currentSource)
{
case XLOG_FROM_ARCHIVE:
case XLOG_FROM_PG_WAL:
/*
* Check to see if the trigger file exists. Note that we
* do this only after failure, so when you create the
* trigger file, we still finish replaying as much as we
* can from archive and pg_wal before failover.
*/
if (StandbyMode && CheckForStandbyTrigger())
{
ShutdownWalRcv();
return false;
}
/*
* Not in standby mode, and we've now tried the archive
* and pg_wal.
*/
if (!StandbyMode)
return false;
/*
* Move to XLOG_FROM_STREAM state, and set to start a
* walreceiver if necessary.
*/
currentSource = XLOG_FROM_STREAM;
startWalReceiver = true;
break;
xlog.c 启动walreceiver 此时时间线tli=1
case XLOG_FROM_STREAM:
{
bool havedata;
/*
* We should be able to move to XLOG_FROM_STREAM only in
* standby mode.
*/
Assert(StandbyMode);
/*
* First, shutdown walreceiver if its restart has been
* requested -- but no point if we're already slated for
* starting it.
*/
if (pendingWalRcvRestart && !startWalReceiver)
{
ShutdownWalRcv();
/*
* Re-scan for possible new timelines if we were
* requested to recover to the latest timeline.
*/
if (recoveryTargetTimeLineGoal ==
RECOVERY_TARGET_TIMELINE_LATEST)
rescanLatestTimeLine();
startWalReceiver = true;
}
pendingWalRcvRestart = false;
/*
* Launch walreceiver if needed.
*
* If fetching_ckpt is true, RecPtr points to the initial
* checkpoint location. In that case, we use RedoStartLSN
* as the streaming start position instead of RecPtr, so
* that when we later jump backwards to start redo at
* RedoStartLSN, we will have the logs streamed already.
*/
if (startWalReceiver &&
PrimaryConnInfo && strcmp(PrimaryConnInfo, "") != 0)
{
XLogRecPtr ptr;
TimeLineID tli;
if (fetching_ckpt)
{
ptr = RedoStartLSN;
tli = ControlFile->checkPointCopy.ThisTimeLineID;
}
else
{
ptr = RecPtr;
/*
* Use the record begin position to determine the
* TLI, rather than the position we're reading.
*/
tli = tliOfPointInHistory(tliRecPtr, expectedTLEs);
if (curFileTLI > 0 && tli < curFileTLI)
elog(ERROR, "according to history file, WAL location %X/%X belongs to timeline %u, but previous recovered WAL file came from timeline %u",
LSN_FORMAT_ARGS(tliRecPtr),
tli, curFileTLI);
}
curFileTLI = tli;
RequestXLogStreaming(tli, ptr, PrimaryConnInfo,
PrimarySlotName,
wal_receiver_create_temp_slot);
flushedUpto = 0;
}
walreceiver.c 建立时间线为1的主备流复制(if (walrcv_startstreaming(wrconn, &options))),当时间线为1的日志传输结束后,主将终止此时间线为1的流复制(len = walrcv_receive(wrconn, &buf, &wait_fd);if (len < 0)),walreceiver进入WalRcvWaitForStartPosition函数,此函数将通知WaitForWALToBecomeAvailable本次流复制结束,等待初始化下一次主备流复制任务
if (walrcv_startstreaming(wrconn, &options))
{
if (first_stream)
ereport(LOG,
(errmsg("started streaming WAL from primary at %X/%X on timeline %u",
LSN_FORMAT_ARGS(startpoint), startpointTLI)));
else
ereport(LOG,
(errmsg("restarted WAL streaming at %X/%X on timeline %u",
LSN_FORMAT_ARGS(startpoint), startpointTLI)));
first_stream = false;
/* Initialize LogstreamResult and buffers for processing messages */
LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
initStringInfo(&reply_message);
initStringInfo(&incoming_message);
/* Initialize the last recv timestamp */
last_recv_timestamp = GetCurrentTimestamp();
ping_sent = false;
/* Loop until end-of-streaming or error */
for (;;)
{
char *buf;
int len;
bool endofwal = false;
pgsocket wait_fd = PGINVALID_SOCKET;
int rc;
/*
* Exit walreceiver if we're not in recovery. This should not
* happen, but cross-check the status here.
*/
if (!RecoveryInProgress())
ereport(FATAL,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot continue WAL streaming, recovery has already ended")));
/* Process any requests or signals received recently */
ProcessWalRcvInterrupts();
if (ConfigReloadPending)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
XLogWalRcvSendHSFeedback(true);
}
/* See if we can read data immediately */
len = walrcv_receive(wrconn, &buf, &wait_fd);
if (len != 0)
{
/*
* Process the received data, and any subsequent data we
* can read without blocking.
*/
for (;;)
{
if (len > 0)
{
/*
* Something was received from primary, so reset
* timeout
*/
last_recv_timestamp = GetCurrentTimestamp();
ping_sent = false;
XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
}
else if (len == 0)
break;
else if (len < 0)
{
ereport(LOG,
(errmsg("replication terminated by primary server"),
errdetail("End of WAL reached on timeline %u at %X/%X.",
startpointTLI,
LSN_FORMAT_ARGS(LogstreamResult.Write))));
endofwal = true;
break;
}
len = walrcv_receive(wrconn, &buf, &wait_fd);
}
/* Let the primary know that we received some data. */
XLogWalRcvSendReply(false, false);
/*
* If we've written some records, flush them to disk and
* let the startup process and primary server know about
* them.
*/
XLogWalRcvFlush(false);
}
/* Check if we need to exit the streaming loop. */
if (endofwal)
break;
。。。
elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
Stage 3
WaitForWALToBecomeAvailable检测到主备流复制关闭之后,重新扫描时间线(rescanLatestTimeLine函数),更新时间线,启动建立时间线为2的流复制(建立过程同stage2)。walreceiver进程接收到请求之后,重置receiveStart 和 receiveStartTLI,并发起时间线为2的主备流复制。
xlog.c
static bool
rescanLatestTimeLine(void)
{
List *newExpectedTLEs;
bool found;
ListCell *cell;
TimeLineID newtarget;
TimeLineID oldtarget = recoveryTargetTLI;
TimeLineHistoryEntry *currentTle = NULL;
newtarget = findNewestTimeLine(recoveryTargetTLI);
if (newtarget == recoveryTargetTLI)
{
/* No new timelines found */
return false;
}
/*
* Determine the list of expected TLIs for the new TLI
*/
newExpectedTLEs = readTimeLineHistory(newtarget);
/*
* If the current timeline is not part of the history of the new timeline,
* we cannot proceed to it.
*/
found = false;
foreach(cell, newExpectedTLEs)
{
currentTle = (TimeLineHistoryEntry *) lfirst(cell);
if (currentTle->tli == recoveryTargetTLI)
{
found = true;
break;
}
}
if (!found)
{
ereport(LOG,
(errmsg("new timeline %u is not a child of database system timeline %u",
newtarget,
ThisTimeLineID)));
return false;
}
/*
* The current timeline was found in the history file, but check that the
* next timeline was forked off from it *after* the current recovery
* location.
*/
if (currentTle->end < EndRecPtr)
{
ereport(LOG,
(errmsg("new timeline %u forked off current database system timeline %u before current recovery point %X/%X",
newtarget,
ThisTimeLineID,
LSN_FORMAT_ARGS(EndRecPtr))));
return false;
}
/* The new timeline history seems valid. Switch target */
recoveryTargetTLI = newtarget;
list_free_deep(expectedTLEs);
expectedTLEs = newExpectedTLEs;
/*
* As in StartupXLOG(), try to ensure we have all the history files
* between the old target and new target in pg_wal.
*/
restoreTimeLineHistoryFiles(oldtarget + 1, newtarget);
ereport(LOG,
(errmsg("new target timeline is %u",
recoveryTargetTLI)));
return true;
}
从日志角度看启动过程如下:




