原作者:杨明翰
一、现象
MogDB在使用brm执行全量备份时,日志中持续打印“keepalive message is received”,该信息代表什么含义,是否在备份中出现了什么问题?这经常造成客户或一线人员的困惑。试着分析一下

二、分析
brm是MogDB提供的备份恢复工具,支持全量、增量和wal日志备份,使用brm可以方便的实现指定备份集、LSN和时间点的恢复。brm在做全量备份时不会影响数据库的正常运行,备份过程和pg_basebackup类似,会使用复制协议获取wal日志的增量。
gs_basebackup使用ReceiveXlogStream函数从指定位置开始获取wal的增量,在接受wal的过程中如果使用了复制槽,该函数会反馈当前flush的位置,以便主库可以安全的清理过期wal。
如果主库没有新的wal产生,ReceiveXlogStream会一直阻塞在reading上。但如果主库或链路异常,一直等待是有问题的。ReceiveXlogStream会调用checkForReceiveTimeout做超时判断,如果超过standby_message_timeout/2 没有接收到新消息,就会主动发一个消息给主库ping一下是否正常,如果超过standby_message_timeout没有消息则报错退出。standby_message_timeout默认10秒。
/*
* @@GaussDB@@
* Brief : check for receive timeout
* Description :
* Check if configured timeout has reached without receiving anything from server. If yes then assume that
* connection broken. If timeout has not reached but half of timeout has reached without receiving anything, then send a
* message to server along with request for an immediate reply.
*/
static bool checkForReceiveTimeout(PGconn* conn)
{
/*
* Check if time since last receive from master has reached the
* configured limit.
*/
if (standby_message_timeout > 0) {
TimestampTz nowtime = localGetCurrentTimestamp();
/*
* We didn't receive anything new, for half of receiver
* replication timeout. Ping the server.
*/
if (localTimestampDifferenceExceeds(last_recv_timestamp, nowtime, (standby_message_timeout / 2))) {
if (ping_sent == false) {
if (sendReplyToSender(conn, nowtime, true) == false) {
return false;
}
ping_sent = true;
last_recv_timestamp = nowtime;
} else {
pg_log(PG_PRINT, _("\nterminating XLogStream receiver due to timeout\n"));
return false;
}
}
}
return true;
}
checkForReceiveTimeout函数调用sendReplyToSender函数给主库发送消息,且参数replyRequested=true
/*
* @@GaussDB@@
* Brief : Send reply to Sender task.
* Description :
* replyRequested is used to decide whether any immediate reply is expected
* from sender
* Notes :
*/
static bool sendReplyToSender(PGconn* conn, TimestampTz nowtime, bool replyRequested)
{
/* Time to send feedback! */
char replybuf[sizeof(StandbyReplyMessage) + 1];
StandbyReplyMessage* replymsg = (StandbyReplyMessage*)(replybuf + 1);
replymsg->receive = InvalidXLogRecPtr;
replymsg->write = InvalidXLogRecPtr;
if (reportFlushPosition)
replymsg->flush = lastFlushPosition;
else
replymsg->flush = InvalidXLogRecPtr;
replymsg->apply = InvalidXLogRecPtr;
replymsg->sendTime = nowtime;
replymsg->replyRequested = replyRequested;
replymsg->peer_role = STANDBY_MODE;
replymsg->peer_state = BUILDING_STATE;
replybuf[0] = 'r';
if (PQputCopyData(conn, replybuf, sizeof(replybuf)) <= 0 || PQflush(conn)) {
pg_log(PG_PRINT, _("%s: could not send feedback packet: %s"), progname, PQerrorMessage(conn));
return false;
}
return true;
}
可以看到replybuf[0] 设置为 ‘r’ 后 会调用ProcessStandbyReplyMessage函数
switch (msgtype) {
case 'r':
ProcessStandbyReplyMessage();
break;
case 'h':
ProcessStandbyHSFeedbackMessage();
break;
case 's':
ProcessStandbySwitchRequestMessage();
break;
case 'A':
ProcessStandbyFileTimeMessage();
break;
case 'a':
ProcessArchiveFeedbackMessage();
break;
case 'S':
ProcessHadrSwitchoverMessage();
break;
case 'R':
ProcessHadrReplyMessage();
break;
default:
ereport(COMMERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("unexpected message type \"%d\"", msgtype)));
proc_exit(0);
}
因为reply.replyRequested = true,主库会发送一条keepalive message to standby,requestReply=false,并设置消息类型t_thrd.walsender_cxt.output_xlog_message[0] = 'k';
/* send a reply if the standby requested one */
if (reply.replyRequested) {
WalSndKeepalive(false);
}
gs_basebackup在收到主库发送的keepalive message消息后,如果消息内容合法,会在stream中跳过这个消息,并输出日志“keepalive message is received\n”
if (copybuf[0] == 'k') {
/*
* keepalive message, sent in 9.2 and newer. We just ignore
* this message completely, but need to skip past it in the
* stream.
*/
if (r != STREAMING_KEEPALIVE_SIZE) {
pg_log(PG_WARNING, _(" keepalive message is incorrect size: %d\n"), r);
goto error;
}
fprintf(stderr, "%100s", "");
fprintf(stderr, "\r");
pg_log(PG_PRINT, _(" keepalive message is received\n"));
/* copy the received buffer to keepalive */
ret = memcpy_s(&keepalive, sizeof(PrimaryKeepaliveMessage), copybuf + 1, sizeof(PrimaryKeepaliveMessage));
securec_check(ret, "\0", "\0");
以上,brm备份日志中持续打印“keepalive message is received”,是备份过程中获取wal增量时的正确行为。由于主库没有新增wal推送,brm为了确定链接状态是否正常,和主库间进行心跳通信的信息输出,日志打印间隔大于等于5秒。无需过分关注。




