暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

海山数据库(He3DB)源码详解:主备复制stop过程

Xiaye_DB 2024-10-21
56

海山数据库(He3DB)源码详解:主备复制stop过程

基础备份——pg_backup_stop

pg_backup_stop函数表示停止基础备份。

123.jpg

  1. 参数解析
    从函数参数中获取waitforarchive标志,该标志指示是否等待WAL归档完成后再停止备份
Datum pg_backup_stop(PG_FUNCTION_ARGS) { #define PG_STOP_BACKUP_V2_COLS 3 // 定义返回行的元组描述符、值数组和空值数组 TupleDesc tupdesc; Datum values[PG_STOP_BACKUP_V2_COLS]; bool nulls[PG_STOP_BACKUP_V2_COLS]; //从函数参数中获取是否等待WAL归档的标志 bool waitforarchive = PG_GETARG_BOOL(0); XLogRecPtr stoppoint; SessionBackupState status = get_backup_status();
  1. 检查返回值
    验证函数的返回类型是否为复合类型(即行类型),以确保能够正确构造并返回包含多个字段的元组
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) elog(ERROR, "return type must be a row type");
  1. 初始化返回值
    初始化用于存储元组描述符(tupdesc)、值数组(values)和空值数组(nulls)的变量,并将valuenulls数组清零
MemSet(values, 0, sizeof(values)); MemSet(nulls, 0, sizeof(nulls));
  1. 检查备份状态
    调用get_backup_status函数获取当前备份状态,若不是SESSION_BACKUP_RUNNING,则报告错误。
if (status != SESSION_BACKUP_RUNNING) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("backup is not in progress"), errhint("Did you call pg_backup_start()?")))
  1. 停止备份
    调用do_pg_backup_stop函数停止备份。该函数会处理停止备份所需的所有底层操作,如更新WAL记录、清理资源等。
stoppoint = do_pg_backup_stop(label_file->data, waitforarchive, NULL);
  1. 设置返回值
    将备份停止时的LSN(stoppoint)转换为Datum类型,并存储在values数组的第一个位置。
    将标签文件和表空间映射文件的内容(label_file->datatblspc_map_file->data)转换为Datum类型(使用CStringGetTextDatum函数),并分别存储在values数组的第二个和第三个位置。
values[0] = LSNGetDatum(stoppoint); values[1] = CStringGetTextDatum(label_file->data); values[2] = CStringGetTextDatum(tblspc_map_file->data);
  1. 释放资源
    释放之前为标签文件和表空间映射文件分配的内存,这包括文件内容本身和可能的字符串信息结构,将label_filetblspc_map_file指针设置为NULL,以避免潜在的内存泄漏
pfree(label_file->data); pfree(label_file); label_file = NULL; pfree(tblspc_map_file->data); pfree(tblspc_map_file); tblspc_map_file = NULL;
  1. 构造并返回元组
    使用heap_form_tuple函数,根据元组描述符(tupdesc)、值数组(values)和空值数组(nulls)构造一个元组。将构造好的元组转换为Datum类型,并使用PG_RETURN_DATUM宏返回给调用者
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));

基础备份——do_pg_backup_stop

do_pg_backup_stop函数表示完成在线备份过程。

  1. 状态检查
    检查状态是否在恢复模式中,如果不是并且当前WAL级别不足以支持在线备份!XLogIsNeeded(),则报错
XLogRecPtr do_pg_backup_stop(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p) { bool backup_started_in_recovery = false; XLogRecPtr startpoint; XLogRecPtr stoppoint; TimeLineID stoptli; pg_time_t stamp_time; char strfbuf[128]; char histfilepath[MAXPGPATH]; char startxlogfilename[MAXFNAMELEN]; char stopxlogfilename[MAXFNAMELEN]; char lastxlogfilename[MAXFNAMELEN]; char histfilename[MAXFNAMELEN]; char backupfrom[20]; XLogSegNo _logSegNo; FILE *fp; char ch; int seconds_before_warning; int waits = 0; bool reported_waiting = false; char *remaining; char *ptr; uint32 hi, lo; backup_started_in_recovery = RecoveryInProgress(); if (!backup_started_in_recovery && !XLogIsNeeded()) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("WAL level not sufficient for making an online backup"), errhint("wal_level must be set to \"replica\" or \"logical\" at server start.")));
  1. 获取WAL插入锁
    通过WALInsertLockAcquireExclusive();获取WAL插入的独占锁,以确保在更新备份计数器、强制页面写入和会话级别锁时的一致性
WALInsertLockAcquireExclusive();
  1. 更新备份计数器并清理会话级别锁
    检查并更新XLogCtl->Insert.runningBackups计数器,每次调用do_pg_backup_start时,该计数器增加;每次调用do_pg_backup_stop时,该计数器减少,在释放WAL插入锁之前,清理会话级别的备份状态(sessionBackupState = SESSION_BACKUP_NONE)。这是因为会话级别的锁可能在执行过程中被中断(通过CHECK_FOR_INTERRUPTS()),而清理会话级别锁需要在中断检查之前完成
Assert(XLogCtl->Insert.runningBackups > 0); XLogCtl->Insert.runningBackups--; if (XLogCtl->Insert.runningBackups == 0) { XLogCtl->Insert.forcePageWrites = false; } sessionBackupState = SESSION_BACKUP_NONE;
  1. 释放WAL插入锁
    释放WAL插入锁
WALInsertLockRelease();
  1. 解析备份标签文件labelfile
    使用sscanf函数从标签文件中读取特定的信息,包括WAL日志的起始位置,WAL文件名,以及一个检查字符ch
if (sscanf(labelfile, "START WAL LOCATION: %X/%X (file %24s)%c", &hi, &lo, startxlogfilename, &ch) != 4 || ch != '\n') ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE))); startpoint = ((uint64) hi) << 32 | lo; remaining = strchr(labelfile, '\n') + 1;
  1. 解析备份来源
    解析BACKUP FROM行,检查是否从备库备份,如果解析失败,则报告错误;如果声明从备库备份但实际上没有在恢复模式,则报告错误
ptr = strstr(remaining, "BACKUP FROM:"); if (!ptr || sscanf(ptr, "BACKUP FROM: %19s\n", backupfrom) != 1) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE))); if (strcmp(backupfrom, "standby") == 0 && !backup_started_in_recovery) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("the standby was promoted during online backup"), errhint("This means that the backup being taken is corrupt " "and should not be used. " "Try taking another online backup.")));
  1. 处理恢复模式下的备份
    如果备份在恢复模式下开始backup_started_in_recovery为真,则进行以下操作:检查在备份期间是否有不包含全页写full-page writes的WAL被重放。如果是,则报告错误,因为这会导致备份损坏。读取并设置备份结束的位置stoppoint和时间线IDstoptli,这些信息从控制文件中获取
if (backup_started_in_recovery) { XLogRecPtr recptr; /* * Check to see if all WAL replayed during online backup contain * full-page writes. */ SpinLockAcquire(&XLogCtl->info_lck); recptr = XLogCtl->lastFpwDisableRecPtr; SpinLockRelease(&XLogCtl->info_lck); if (startpoint <= recptr) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("WAL generated with full_page_writes=off was replayed " "during online backup"), errhint("This means that the backup being taken on the standby " "is corrupt and should not be used. " "Enable full_page_writes and run CHECKPOINT on the primary, " "and then try an online backup again."))); LWLockAcquire(ControlFileLock, LW_SHARED); stoppoint = ControlFile->minRecoveryPoint; stoptli = ControlFile->minRecoveryPointTLI; LWLockRelease(ControlFileLock); }
  1. 处理非恢复模式下的备份
    如果不是在恢复模式下开始备份,则进行以下操作:写入备份结束的WAL记录。强制切换到新的WAL段文件,以确保备份在WAL段文件被归档后立即可用。计算并记录备份结束时的WAL文件名stopxlogfilename
    格式化并记录备份的结束时间(使用日志时区)。写入备份历史文件,包含开始和结束的WAL位置、时间、时间线ID等信息,以及标签文件的剩余内容。清理不再需要的备份历史文件,并发布新的历史文件可归档的通知
else { /* * Write the backup-end xlog record */ XLogBeginInsert(); XLogRegisterData((char *) (&startpoint), sizeof(startpoint)); stoppoint = XLogInsert(RM_XLOG_ID, XLOG_BACKUP_END); /* * Given that we're not in recovery, InsertTimeLineID is set and can't * change, so we can read it without a lock. */ stoptli = XLogCtl->InsertTimeLineID; /* * Force a switch to a new xlog segment file, so that the backup is * valid as soon as archiver moves out the current segment file. */ RequestXLogSwitch(false); XLByteToPrevSeg(stoppoint, _logSegNo, wal_segment_size); XLogFileName(stopxlogfilename, stoptli, _logSegNo, wal_segment_size); /* Use the log timezone here, not the session timezone */ stamp_time = (pg_time_t) time(NULL); pg_strftime(strfbuf, sizeof(strfbuf), "%Y-%m-%d %H:%M:%S %Z", pg_localtime(&stamp_time, log_timezone)); XLByteToSeg(startpoint, _logSegNo, wal_segment_size); BackupHistoryFilePath(histfilepath, stoptli, _logSegNo, startpoint, wal_segment_size); fp = AllocateFile(histfilepath, "w"); if (!fp) ereport(ERROR, (errcode_for_file_access(), errmsg("could not create file \"%s\": %m", histfilepath))); fprintf(fp, "START WAL LOCATION: %X/%X (file %s)\n", LSN_FORMAT_ARGS(startpoint), startxlogfilename); fprintf(fp, "STOP WAL LOCATION: %X/%X (file %s)\n", LSN_FORMAT_ARGS(stoppoint), stopxlogfilename); /* * Transfer remaining lines including label and start timeline to * history file. */ fprintf(fp, "%s", remaining); fprintf(fp, "STOP TIME: %s\n", strfbuf); fprintf(fp, "STOP TIMELINE: %u\n", stoptli); if (fflush(fp) || ferror(fp) || FreeFile(fp)) ereport(ERROR, (errcode_for_file_access(), errmsg("could not write file \"%s\": %m", histfilepath))); /* * Clean out any no-longer-needed history files. As a side effect, * this will post a .ready file for the newly created history file, * notifying the archiver that history file may be archived * immediately. */ CleanupBackupHistory(); }
  1. 条件检查
    如果备份不是在恢复模式下开始的!backup_started_in_recovery且WAL归档是活跃的XLogArchivingActive(),或者
    如果备份是在恢复模式下开始的backup_started_in_recovery且WAL归档总是被激活的XLogArchivingAlways()
    则继续执行等待WAL归档的逻辑。
  2. 准备文件名和位置
    使用XLByteToPrevSeg函数将停止点stoppoint转换为WAL段号和偏移量(实际存储在_logSegNo中,但此变量未在代码段中显式使用其转换后的值,可能用于其他目的)。
    使用XLogFileName函数根据时间线IDstoptli、段号和WAL段大小生成最后一个WAL文件的名称lastxlogfilename
    使用XLByteToSeg函数将起始点startpoint也转换为WAL段号和偏移量(同样,这里可能仅使用段号)。
    使用BackupHistoryFileName函数生成备份历史文件的名称histfilename,该文件记录了备份的相关信息,包括起始点等。
  3. 等待WAL归档
    初始化等待警告前的秒数seconds_before_warning为60秒,并设置等待计数器waits为0。
    进入一个循环,检查lastxlogfilenamehistfilename指定的WAL文件和备份历史文件是否仍被标记为“忙碌”(即是否仍在归档过程中)如果是的话,则进入循环:如果检测到中断请求(如用户取消操作),则处理这些请求。如果等待时间超过5秒且尚未报告等待状态,则发出通知告知用户正在等待WAL归档完成。使用WaitLatch函数等待一段时间(1000毫秒/1秒),同时允许处理其他信号或超时。
    重置闩锁状态以准备下一次等待。
    如果等待时间达到或超过seconds_before_warning,则发出警告,并指数增加seconds_before_warning的值(以避免在短时间内重复发出大量警告)。
  4. 归档完成
    一旦所有必要的WAL文件和备份历史文件都不再被标记为“忙碌”,即所有文件都已归档完成,循环结束。发出通知告知用户所有必要的WAL段都已归档
  5. 处理未启用WAL归档的情况
    如果waitforarchive为真但WAL归档未启用,则发出通知提醒用户必须确保所有必要的WAL段通过其他方式被复制以完成备份
  6. 返回结果
    如果提供了指向时间线ID指针的变量stoptli_p,则将其设置为停止时间线IDstoptli
    返回停止点stoppoint,即备份结束时的WAL位置

基础备份——WaitLatch

  1. 参数验证和准备
    检查是否在主进程Postmaster的控制下运行,并确保对于主进程死亡的处理方式是恰当的。如果不在主进程控制下,或者已经设置了处理主进程死亡的方式(通过wakeEvents参数中的 WL_EXIT_ON_PM_DEATHWL_POSTMASTER_DEATH),则继续执行
    准备等待事件和相关的闩锁对象。如果wakeEvents参数中没有设置WL_LATCH_SET标志,表示不需要等待特定的闩锁事件,此时将latch设置为NULL。同时,更新全局的等待事件集合(LatchWaitSet)中的闩锁和主进程死亡处理标志
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info) { WaitEvent event;
  1. 等待事件
    调用WaitEventSetWait函数,将LatchWaitSet设置为等待状态,并指定超时时间(如果wakeEvents中包含了WL_TIMEOUT标志,则使用timeout参数作为超时时间;否则,使用 -1 表示无限等待)。同时,准备一个WaitEvent结构体来接收触发的事件信息。
    WaitEventSetWait 函数会阻塞当前进程,直到以下条件之一发生:
    指定的闩锁(如果设置了WL_LATCH_SET)被设置。
    超时时间到达(如果设置了超时)。
    主进程死亡(如果设置了处理主进程死亡的方式)。
    其他在LatchWaitSet中注册的事件被触发。
if (WaitEventSetWait(LatchWaitSet, (wakeEvents & WL_TIMEOUT) ? timeout : -1, &event, 1, wait_event_info) == 0)
  1. 处理结果
    WaitEventSetWait返回后,检查返回值以确定等待是否因为超时而结束。如果是(返回值为 0),则函数返回WL_TIMEOUT,表示等待因为超时而中断。
    如果不是因为超时结束,那么从event结构体中获取触发的事件(event.events),并返回该值。这个值包含了触发等待结束的具体事件信息,如WL_LATCH_SET表示闩锁被设置,WL_TIMEOUT表示超时等。但在这里,由于超时情况已经被单独处理,所以这里返回的event.events更可能是闩锁被设置或其他在LatchWaitSet中注册的事件。
if (WaitEventSetWait(...) == 0) return WL_TIMEOUT; else return event.events;

作者介绍

周雨慧 中移(苏州)软件技术有限公司 数据库内核开发工程师

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论