暂无图片
暂无图片
2
暂无图片
暂无图片
暂无图片

PG起库逻辑和spill导致起库慢问题分析

原创 liuzhilong62 2025-01-02
311

问题现象-起库缓慢

版本pg13.2

数据库启动缓慢,startup进程在读取spill文件,文件名在变化。查看spill文件也很慢,ls -l最后跑出来有800w个文件spill文件。

为什么有上千万个spill文件

wal段和LSN的含义

LSN

LSN总体是一个64位的bigint,LSN实际长这样42D3B/1732C540(hex),斜杠/前是32位逻辑日志号,/后32位是段号+块号+块内偏移。这4个部分分别是:

32位 8位 11位 13位
逻辑日志号 日志段号 块号 块内偏移

块内偏移 8192=2^13

块号=16M(默认wal段大小)/8192

wal segment

wal文件名由3组16进制数字组成。

以8k的wal文件0000000300042D3B00000002为例:

32位 32位 32位
timeline 逻辑日志号 日志段号
00000003 00042D3B 00000002

可以看出LSN可以定位到wal文件名及文件中offset位置。

其中,LSN斜杠/前是逻辑日志号,斜杠/后8位的日志段号下面都会用到。

spill文件名转换

复制槽名:logical_ex2209_rep

spill文件名:xid-407989064-lsn-42D1E-20000000.spill

42D1E不是一个完整的LSN,不能直接用pg_walfile_name来定位wal文件名。42D1E是一个逻辑日志号,如果直接过滤文件名含42D1E的wal文件,可以找到16个wal文件。

能否通过数字20000000定位到wal日志段号从而定位到哪一个文件呢?

spill文件名:

/* * Given a replication slot, transaction ID and segment number, fill in the * corresponding spill file into 'path', which is a caller-owned buffer of size * at least MAXPGPATH. */ static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno) { XLogRecPtr recptr; XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr); snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.spill", NameStr(MyReplicationSlot->data.name), xid, (uint32) (recptr >> 32), (uint32) recptr); }

pg_replslot/%s和xid-%u-lsn比较好理解,就是复制槽名称和xid。后面的recptr还要再看下定义:

/* * Pointer to a location in the XLOG. These pointers are 64 bits wide, * because we don't want them ever to overflow. */ typedef uint64 XLogRecPtr;

XLogSegNoOffsetToRecPtr是通过wal日志段号、段大小、offset计算LSN:

#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest) \ (dest) = (segno) * (wal_segsz_bytes) + (offset)

XLogRecPtr就是LSN!那么

(uint32) (recptr >> 32)表示取LSN前32位,(uint32) recptr)取LSN后32位。

LSN前32位也就是刚才看到的LSN前半段,lsn-42D1E;LSN后32位实际上信息多了,这里只要LSN后32位中的前几位的段号即可。

因为传入的offset=0,也传入了segno,那么根本不需要wal日志段内偏移量信息,就可以计算出dest的值。wal_segsz_bytes的真实值是128M*1014*1024,将XLogSegNoOffsetToRecPtr中的式子转化下为:

segno= dest/(128*1024*1024) -- 再把16进制20000000转化下 segno= x'20000000'::int/(128*1024*1024) segno= 4

可以从式子算出日志段号segno,也就可以定位到wal文件号了。

所以,这里的spill文件名:xid-407989064-lsn-42D1E-20000000.spill对应的wal文件为

逻辑日志号=42D1E,段号=04:

ls 42D1E*04 0000000200042D1E00000004

pg_waldump可以看到xid 407989064在里面。

实际上wal大小在实例创建后也是固定的,即(128*1024*1024)是一个常量,那么segno跟(uint32) recptr绝对相关,但不相等。也就是说切换一个wal日志就会切换一个spill。

最后总结spill文件生成规则如下:

  • 同一个事务id,如果跨wal就会产生多个spill。如:一个不含子事务的大事务跨越3个wal,就会对应3个spill文件
  • 不同的事务id对应不同的spill。如:1000w个子事务对应1000w个spill

spill文件名结构xid-407989064-lsn-42D1E-20000000.spill:

xid lsn前32位;即wal逻辑日志号 由wal日志段号换算;不等于段号
xid-407989064 lsn-42D1E 20000000
# 恢复出的环境 [postgres]$ ll |head -100 total 40000276 -rw------- 1 postgres postgres 184 Dec 6 15:20 state -rw------- 1 postgres postgres 196 Dec 6 13:25 xid-407989064-lsn-42D1E-0.spill -rw------- 1 postgres postgres 208 Dec 6 13:25 xid-407989064-lsn-42D1E-20000000.spill ... -rw------- 1 postgres postgres 540 Dec 6 16:44 xid-407989064-lsn-42D2A-D0000000.spill -rw------- 1 postgres postgres 152 Dec 6 13:09 xid-407989065-lsn-42D1D-C8000000.spill -rw------- 1 postgres postgres 152 Dec 6 13:09 xid-407989066-lsn-42D1D-C8000000.spill -rw------- 1 postgres postgres 152 Dec 6 13:09 xid-407989068-lsn-42D1D-C8000000.spill -rw------- 1 postgres postgres 152 Dec 6 13:09 xid-407989070-lsn-42D1D-C8000000.spill -rw------- 1 postgres postgres 152 Dec 6 13:09 xid-407989072-lsn-42D1D-C8000000.spill -rw------- 1 postgres postgres 152 Dec 6 13:09 xid-407989076-lsn-42D1D-C8000000.spill -rw------- 1 postgres postgres 152 Dec 6 13:09 xid-407989079-lsn-42D1D-C8000000.spill -rw------- 1 postgres postgres 152 Dec 6 13:09 xid-407989080-lsn-42D1D-C8000000.spill -rw------- 1 postgres postgres 152 Dec 6 13:09 xid-407989082-lsn-42D1D-C8000000.spill [postgres@lzlhost /myhost/liuzhilong/pg_replslot/logical_ex9e15_rep]$ ll |awk '{print $9}'|awk -F '-' '{print $2}'|sort|uniq -c|wc -l 10000003 [postgres@lzlhost /myhost/liuzhilong/pg_replslot/logical_ex9e15_rep]$ ll |wc -l 10000070

所以我们在实际环境中看到了10000070个文件,文件的distinct xid有10000003个,也就是说1个父事务跨越约70个wal文件,这个父事务有1000w个子事务。

复制槽溢出测试

--发布订阅搭建复制链路 logical_decoding_work_mem = 64MB #pg_ctl reload wal_segment_size =128 MB --source CREATE TABLE replication_table ( id BIGSERIAL PRIMARY KEY, column1 char(2000), column2 char(2000), column3 char(2000) ); create publication pub_test for table replication_table ; --dest CREATE TABLE replication_table ( id BIGSERIAL PRIMARY KEY, column1 char(2000), column2 char(2000), column3 char(2000) ); CREATE SUBSCRIPTION sub_test CONNECTION 'host=127.0.0.1 port=8094 dbname=lzl user=lzl password=qwer' PUBLICATION pub_test; --source select * from pg_replication_slots;

大事务、无子事务、复制表溢出测试

--创建一个大事务暂时不提交 begin; insert into replication_table(column1,column2,column3) select 'a','b','c' from generate_series(1,1000000) g; --复制槽溢出 ll total 331924 -rw------- 1 postgres postgres 184 Dec 9 20:22 state -rw------- 1 postgres postgres 88226964 Dec 9 20:22 xid-5074343-lsn-163-38000000.spill -rw------- 1 postgres postgres 119698488 Dec 9 20:22 xid-5074343-lsn-163-40000000.spill

大事务提交后,等待消费直至复制链路延迟为0,spill文件消失

M=# select pid,usename,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,reply_time from pg_stat_replication; pid | usename | sent_lsn | write_lsn | flush_lsn | replay_lsn | write_lag | flush_lag | replay_lag | reply_time --------+---------+--------------+--------------+--------------+--------------+-----------+-----------+------------+------------------------------ 163525 | lzl | 163/4996E1C8 | 163/4996E1C8 | 163/4996E1C8 | 163/4996E1C8 | [null] | [null] | [null] | 2024-12-09 20:25:35.14769+08 (1 row) M=# select pid,usename,pg_wal_lsn_diff(pg_current_wal_lsn(),sent_lsn) diff_sent_mb,pg_wal_lsn_diff(pg_current_wal_lsn(),write_lsn) diff_write_mb,pg_wal_lsn_diff(pg_current_wal_lsn(),flush_lsn) diff_flush_mb,pg_wal_lsn_diff(pg_current_wal_lsn(),replay_lsn) diff_replay_mb,pg_walfile_name_offset(sent_lsn) sentoffset,pg_walfile_name_offset(write_lsn) writeoffset,pg_walfile_name_offset(flush_lsn) flush_lsn from pg_stat_replication; pid | usename | diff_sent_mb | diff_write_mb | diff_flush_mb | diff_replay_mb | sentoffset | writeoffset | flush_lsn --------+---------+--------------+---------------+---------------+----------------+-------------------------------------+-------------------------------------+------------------------------- 163525 | lzl | 0 | 0 | 0 | 0 | (000000010000016300000009,26665416) | (000000010000016300000009,26665416) | (000000 [/mypg/pg8094/data/pg_replslot/sub_test]$ ll total 357392 -rw------- 1 postgres postgres 184 Dec 9 20:23 state -rw------- 1 postgres postgres 88226964 Dec 9 20:22 xid-5074343-lsn-163-38000000.spill -rw------- 1 postgres postgres 137696328 Dec 9 20:23 xid-5074343-lsn-163-40000000.spill -rw------- 1 postgres postgres 26076708 Dec 9 20:23 xid-5074343-lsn-163-48000000.spill [/mypg/pg8094/data/pg_replslot/sub_test]$ ll total 4 -rw------- 1 postgres postgres 184 Dec 9 20:25 state2666 (1 row)

大事务、无子事务、非复制表溢出测试

--source 创建一个不相干的表,准备写入数据 CREATE TABLE no_replication_table ( id BIGSERIAL PRIMARY KEY, column1 char(2000), column2 char(2000), column3 char(2000) ); --创建一个大事务暂时不提交 begin; insert into no_replication_table(column1,column2,column3) select 'a','b','c' from generate_series(1,1000000) g; --溢出 [postgres@lzldb:MYINST:8094 /mypg/pg8094/data/pg_replslot/sub_test]$ ll total 357492 -rw------- 1 postgres postgres 184 Dec 9 20:09 state -rw------- 1 postgres postgres 107511456 Dec 9 20:08 xid-5074106-lsn-163-28000000.spill -rw------- 1 postgres postgres 137698804 Dec 9 20:09 xid-5074106-lsn-163-30000000.spill -rw------- 1 postgres postgres 4308444 Dec 9 20:09 xid-5074106-lsn-163-38000000.spill

大事务、子事务、非复制表溢出测试

# 一次insert一行,每个insert一个子事务 echo "begin;">subtx.sql for i in {1..1000000} do echo "savepoint p$i;">>subtx.sql echo "insert into no_replication_table(column1,column2,column3) select 'a','b','c';">>subtx.sql done nohup psql -d lzl -f subtx.sql &
#执行过程中,观察到溢出80w+文件 [/myhost/pg8094/data/pg_replslot/sub_test]$ ll |wc -l 823749 [/myhost/pg8094/data/pg_replslot/sub_test]$ ll |head -10 total 1099532 -rw------- 1 postgres postgres 184 Dec 9 21:10 state -rw------- 1 postgres postgres 1236 Dec 9 21:10 xid-5519686-lsn-163-70000000.spill -rw------- 1 postgres postgres 252 Dec 9 21:09 xid-5519687-lsn-163-70000000.spill -rw------- 1 postgres postgres 252 Dec 9 21:09 xid-5519688-lsn-163-70000000.spill -rw------- 1 postgres postgres 252 Dec 9 21:09 xid-5519689-lsn-163-70000000.spill -rw------- 1 postgres postgres 252 Dec 9 21:09 xid-5519690-lsn-163-70000000.spill -rw------- 1 postgres postgres 252 Dec 9 21:09 xid-5519691-lsn-163-70000000.spill -rw------- 1 postgres postgres 252 Dec 9 21:09 xid-5519692-lsn-163-70000000.spill -rw------- 1 postgres postgres 252 Dec 9 21:09 xid-5519693-lsn-163-70000000.spill

数据库启动慢分析

startup进程起库流程分析

这里以堆栈编号逐栈解析起库流程:

11:main:没啥好说的

10:PostmasterMain

在主循环前,会先调用起库流程StartupPID = StartupDataBase();本质上是调用StartChildProcess(StartupProcess)

#define StartupDataBase() StartChildProcess(StartupProcess)

9:StartChildProcess :fork一个进程。该进程为启动postmaster的辅助进程,正常的子进程启动都走这个逻辑,在这一步fork。这里的入参AuxProcType=StartupProcess

8:AuxiliaryProcessMain

因为MyAuxProcType=StartupProcess,所以走的是StartupProcessMain流程,这不同于walsender,walwrite,bgwriter这些子进程的流程。startup进程本身是为了宕机恢复读wal的进程,但是它还做了很多事情

switch (MyAuxProcType) { case CheckerProcess: /* don't set signals, they're useless here */ CheckerModeMain(); proc_exit(1); /* should never return */ case BootstrapProcess: /* * There was a brief instant during which mode was Normal; this is * okay. We need to be in bootstrap mode during BootStrapXLOG for * the sake of multixact initialization. */ SetProcessingMode(BootstrapProcessing); bootstrap_signals(); BootStrapXLOG(); BootstrapModeMain(); proc_exit(1); /* should never return */ case StartupProcess: //这里这里这里这里 /* don't set signals, startup process has its own agenda */ StartupProcessMain(); proc_exit(1); /* should never return */ case BgWriterProcess: /* don't set signals, bgwriter has its own agenda */ BackgroundWriterMain(); proc_exit(1); /* should never return */ case CheckpointerProcess: /* don't set signals, checkpointer has its own agenda */ CheckpointerMain(); proc_exit(1); /* should never return */ case WalWriterProcess: /* don't set signals, walwriter has its own agenda */ InitXLOGAccess(); WalWriterMain(); proc_exit(1); /* should never return */ case WalReceiverProcess: /* don't set signals, walreceiver has its own agenda */ WalReceiverMain(); proc_exit(1); /* should never return */ default: elog(PANIC, "unrecognized process type: %d", (int) MyAuxProcType); proc_exit(1); }

7:StartupProcessMain:主要是为了调用StartupXLOG()

6:StartupXLOG

函数注释:

This must be called ONCE during postmaster or standalone-backend startup

StartupXLOG无论怎样都会被postmaster调用,无论是否是崩溃停库还是一致性停库

switch (ControlFile->state) { ... case DB_IN_PRODUCTION: ereport(LOG, (errmsg("database system was interrupted; last known up at %s", str_time(ControlFile->time)))); break;

这跟log日志能对上,以下是log的停库起库输出:

2024-12-06 17:02:57.534 CST,,,447560,,65693cde.6d448,1325,,2023-12-01 09:54:38 CST,,0,LOG,00000,"database system is shut down",,,,,,,,,"","postmaster" 2024-12-06 17:03:49.536 CST,,,211844,,6752bdf3.33b84,1,,2024-12-06 17:03:47 CST,,0,LOG,00000,"ending log output to stderr",,"Future log output will go to log destination ""csvlog"".",,,,,,,"","postmaster" 2024-12-06 17:03:49.536 CST,,,211844,,6752bdf3.33b84,2,,2024-12-06 17:03:47 CST,,0,LOG,00000,"starting PostgreSQL 13.2 (RaseSQL 1.3) on x86_64-pc-linux-gnu, compiled by gcc (GCC) 4.8.5 20150623 (Red Hat 4.8.5-39.0.1), 64-bit",,,,,,,,,"","postmaster" 2024-12-06 17:03:49.537 CST,,,211844,,6752bdf3.33b84,3,,2024-12-06 17:03:47 CST,,0,LOG,00000,"listening on IPv4 address ""0.0.0.0"", port 7284",,,,,,,,,"","postmaster" 2024-12-06 17:03:49.539 CST,,,211844,,6752bdf3.33b84,4,,2024-12-06 17:03:47 CST,,0,LOG,00000,"listening on Unix socket ""/tmp/.s.PGSQL.7284""",,,,,,,,,"","postmaster" 2024-12-06 17:03:49.557 CST,,,211995,,6752bdf5.33c1b,1,,2024-12-06 17:03:49 CST,,0,LOG,00000,"database system was interrupted; last known up at 2024-12-06 17:00:10 CST",,,,,,,,,"","startup"

所以,当时停库后,控制文件记录的数据库状态为in production

Database cluster state: in production

in production这个状态是数据库正在运行,而不是正常的shutdown状态,说明当时数据库停库时不是一致性停库

继续其中关于fsync的关键代码:

/*---------- * If we previously crashed, perform a couple of actions: * * - The pg_wal directory may still include some temporary WAL segments * used when creating a new segment, so perform some clean up to not * bloat this path. This is done first as there is no point to sync * this temporary data. * * - There might be data which we had written, intending to fsync it, but * which we had not actually fsync'd yet. Therefore, a power failure in * the near future might cause earlier unflushed writes to be lost, even * though more recent data written to disk from here on would be * persisted. To avoid that, fsync the entire data directory. */ if (ControlFile->state != DB_SHUTDOWNED && ControlFile->state != DB_SHUTDOWNED_IN_RECOVERY) { RemoveTempXlogFiles(); SyncDataDirectory(); }

这里因为控制文件记录的状态不是正常停库的,所以走到if中调用SyncDataDirectory()做fsync持久化。

StartupXLOG做了很多很多事,其中跟spill相关的除了SyncDataDirectory()还有StartupReorderBuffer()

/* * Initialize replication slots, before there's a chance to remove * required resources. */ StartupReplicationSlots(); /* * Startup logical state, needs to be setup now so we have proper data * during crash recovery. */ StartupReorderBuffer();

StartupReorderBuffer虽然也会被调用,它会调用ReorderBufferCleanupSerializedTXNs清理所有slot目录的spill文件(不是删除目录和state文件)

/* * Delete all data spilled to disk after we've restarted/crashed. It will be * recreated when the respective slots are reused. */ void StartupReorderBuffer(void) { DIR *logical_dir; struct dirent *logical_de; logical_dir = AllocateDir("pg_replslot"); while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL) { if (strcmp(logical_de->d_name, ".") == 0 || strcmp(logical_de->d_name, "..") == 0) continue; /* if it cannot be a slot, skip the directory */ if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2)) continue; /* * ok, has to be a surviving logical slot, iterate and delete * everything starting with xid-* */ ReorderBufferCleanupSerializedTXNs(logical_de->d_name); } FreeDir(logical_dir); }

5:SyncDataDirectory

这段函数注释非常重要:

/* * Issue fsync recursively on PGDATA and all its contents. * * We fsync regular files and directories wherever they are, but we * follow symlinks only for pg_wal and immediately under pg_tblspc. * Other symlinks are presumed to point at files we're not responsible * for fsyncing, and might not have privileges to write at all. * * Errors are logged but not considered fatal; that's because this is used * only during database startup, to deal with the possibility that there are * issued-but-unsynced writes pending against the data directory. We want to * ensure that such writes reach disk before anything that's done in the new * run. However, aborting on error would result in failure to start for * harmless cases such as read-only files in the data directory, and that's * not good either. * * Note that if we previously crashed due to a PANIC on fsync(), we'll be * rewriting all changes again during recovery. * * Note we assume we're chdir'd into PGDATA to begin with. */
  • fsync所有data目录文件使之持久化
  • 这个动作只会发生在起库阶段
  • 这个动作是为了保证在数据库运行前data目录是完全持久化的

SyncDataDirectory主体是递归遍历目录并fsync(link文件稍微特殊处理一下):

walkdir(".", datadir_fsync_fname, false, LOG); if (xlog_is_symlink) walkdir("pg_wal", datadir_fsync_fname, false, LOG); walkdir("pg_tblspc", datadir_fsync_fname, true, LOG);

4:walkdir:递归到.

3:walkdir:递归到./pg_replslot

2:walkdir:递归到./pg_replslot/slotname

1:lstat :C库调用。walkdir不仅要做fsync(入参函数datadir_fsync_fname),walkdir函数本体还要做 lstat获取文件信息,如inode、文件大小、最近修改时间等等,类似linux的stat命令。

0:_lxstat:C库调用

起库逻辑汇总

  • pg会启动一个辅助进程startup以协助起库,不同于在常见的childprocess(walwriter、bgwriter、checkpointer等等)进程,它是起库过程中必定会启动的进程,它会做很多事情
  • StartupXLOG起库时一定会被调用,无论数据库是否一致性停库
  • 只有非正常停库状态下,才会触发SyncDataDirectory
  • SyncDataDirectory会fsync持久化所有data文件,并查看所有data文件的stat信息
  • fsync是为了在库启动前保证data文件都一致;stat应该是为了验证文件是否正常和可读(在startup进程启动前只验证过datadir目录可读性)
  • 无论停库状态,StartupReorderBuffer一定会被调用并清理所有复制槽的spill文件

什么时候是ready状态

startup进程把活干完后数据库还不是ready状态,在pmState状态机改变状态时会调用reaper回收进程函数。reaper函数本身是为了子进程退出后进行一些回收或者启动工作。pmState状态机记录状态为PM_STARTUP,状态机是控制启停库状态的。

PostmasterMain的最后几步:

StartupPID = StartupDataBase(); Assert(StartupPID != 0); StartupStatus = STARTUP_RUNNING; pmState = PM_STARTUP; //状态机改变状态 /* Some workers may be scheduled to start now */ maybe_start_bgworkers(); status = ServerLoop(); /* * ServerLoop probably shouldn't ever return, but if it does, close down. */ ExitPostmaster(status != STATUS_OK); abort(); /* not reached */ }

PostmasterMain起库的核心流程会走到reaper以处理startup进程的正常退出,

PMState注释:

/* * We use a simple state machine to control startup, shutdown, and * crash recovery (which is rather like shutdown followed by startup). * * After doing all the postmaster initialization work, we enter PM_STARTUP * state and the startup process is launched. The startup process begins by * reading the control file and other preliminary initialization steps. * In a normal startup, or after crash recovery, the startup process exits * with exit code 0 and we switch to PM_RUN state.

PMState会被信号传递和处理,startup进程退出后reaper会被激活以回收进程。

reaper函数处理startup子进程的正常退出态:

if (pid == StartupPID) { StartupPID = 0; ... /* * Startup succeeded, commence normal operations */ StartupStatus = STARTUP_NOT_RUNNING; //由STARTUP_RUNNING转成STARTUP_NOT_RUNNING FatalError = false; //上面一堆if未命中后,才不是fatal的 AbortStartTime = 0; ReachedNormalRunning = true; pmState = PM_RUN; //状态机由PM_STARTUP转成PM_RUN connsAllowed = ALLOW_ALL_CONNS; /* * Crank up the background tasks, if we didn't do that already * when we entered consistent recovery state. It doesn't matter * if this fails, we'll just try again later. */ //以下都在启动核心子进程 if (CheckpointerPID == 0) CheckpointerPID = StartCheckpointer(); if (BgWriterPID == 0) BgWriterPID = StartBackgroundWriter(); if (WalWriterPID == 0) WalWriterPID = StartWalWriter(); /* * Likewise, start other special children as needed. In a restart * situation, some of them may be alive already. */ //以下都在启动非核心子进程 if (!IsBinaryUpgrade && AutoVacuumingActive() && AutoVacPID == 0) AutoVacPID = StartAutoVacLauncher(); if (PgArchStartupAllowed() && PgArchPID == 0) PgArchPID = pgarch_start(); if (PgStatPID == 0) PgStatPID = pgstat_start(); /* workers may be scheduled to start now */ maybe_start_bgworkers(); //此时才是正式的可接受连接状态 /* at this point we are really open for business */ ereport(LOG, (errmsg("database system is ready to accept connections"))); /* Report status */ AddToDataDirLockFile(LOCK_FILE_LINE_PM_STATUS, PM_STATUS_READY); #ifdef USE_SYSTEMD sd_notify(0, "READY=1"); #endif continue; }

“database system is ready to accept connections”信息就在这里了

checkpointer、bgwrite、walwrite、autovacuum、arch(如有)、stat这些进程都需要启动,在这个阶段拉起这些进程不是必须返回成功的,后续也可以在ServerLoop或者再次执行reaper时尝试启动,只有startup进程是必须一次性必须启动并完成所有相关任务的:

if (pid < 0) { /* in parent, fork failed */ int save_errno = errno; errno = save_errno; switch (type) { case StartupProcess: ereport(LOG, (errmsg("could not fork startup process: %m"))); break; case BgWriterProcess: ereport(LOG, (errmsg("could not fork background writer process: %m"))); break; case CheckpointerProcess: ereport(LOG, (errmsg("could not fork checkpointer process: %m"))); break; case WalWriterProcess: ereport(LOG, (errmsg("could not fork WAL writer process: %m"))); break; case WalReceiverProcess: ereport(LOG, (errmsg("could not fork WAL receiver process: %m"))); break; default: ereport(LOG, (errmsg("could not fork process: %m"))); break; } /* * fork failure is fatal during startup, but there's no need to choke * immediately if starting other child types fails. */ if (type == StartupProcess) ExitPostmaster(1); return 0; }

spill文件生成逻辑各版本差异

spill在各个版本都是spill最大的事务,这里重点关注啥时候spill的逻辑

PG12:pg12的changes是4096条写死

static const Size max_changes_in_memory = 4096;
/* * Check whether the transaction tx should spill its data to disk. */ static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) { /* * TODO: improve accounting so we cheaply can take subtransactions into * account here. */ if (txn->nentries_mem >= max_changes_in_memory) { ReorderBufferSerializeTXN(rb, txn); Assert(txn->nentries_mem == 0); } }

PG13:超过logical_decoding_work_mem内存大小就spill

static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) { ... while (rb->size >= logical_decoding_work_mem * 1024L) { /* * Pick the largest transaction (or subtransaction) and evict it from * memory by serializing it to disk. */ txn = ReorderBufferLargestTXN(rb); ReorderBufferSerializeTXN(rb, txn); ... }

PG14:多个了一个流式传输ReorderBufferStreamTXN

static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) { ... while (rb->size >= logical_decoding_work_mem * 1024L) { /* * Pick the largest transaction (or subtransaction) and evict it from * memory by streaming, if possible. Otherwise, spill to disk. */ if (ReorderBufferCanStartStreaming(rb) && (txn = ReorderBufferLargestTopTXN(rb)) != NULL) {... ReorderBufferStreamTXN(rb, txn); } else {... ReorderBufferSerializeTXN(rb, txn); } ... }

14虽然有了流式复制,但是触发是要一定条件的:

/* Returns true, if the streaming can be started now, false, otherwise. */ static inline bool ReorderBufferCanStartStreaming(ReorderBuffer *rb) { LogicalDecodingContext *ctx = rb->private_data; SnapBuild *builder = ctx->snapshot_builder; /* We can't start streaming unless a consistent state is reached. */ if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT) return false; /* * We can't start streaming immediately even if the streaming is enabled * because we previously decoded this transaction and now just are * restarting. */ if (ReorderBufferCanStream(rb) && !SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr)) return true; return false; }
/* * Found a point after SNAPBUILD_FULL_SNAPSHOT where all transactions that * were running at that point finished. Till we reach that we hold off * calling any commit callbacks. */ SNAPBUILD_CONSISTENT = 2

额外的steam触发条件:

  • 条件1:快照中的事务涵盖的所有事务都已完成(应该指commit or rollback)

  • 条件2:context是私有数据(是不是说两条链路一张表就不会触发steam?)

  • 条件3:快照中的事务是不可忽略的事务(可能指特殊的事务可以忽略,就不做了)

PG15:跟14差不多,只是函数更清晰,套娃少一些了

PG16:差不多

PG17:差不多,新增一个DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE可以强制stream

记忆点:

  • PG12及以前是写死的4096条changes
  • PG13新增logical_decoding_work_mem参数,可调整内存大小以减少spill概率
  • PG14及以后支持流式复制Streaming
  • 触发流式复制也需要一定的条件,所以即使有流式复制也可能会发生spill
  • PG17新增debug_logical_replication_streaming参数以强制触发流式传输

spill文件清理逻辑

起库时清理spill其实只是一种场景,还有启动walsender清理和drop slot清理。

walsender启动时清理

ReorderBufferCleanupSerializedTXNs会在数据库启动(walsender还没有启动)、walsender启动(数据库运行中)时被调用,注意这两部分场景是不一样的,只是他们会调用同一个函数。从函数注释部分也可以看出,该函数是为了“删除残留的序列化的reorder buffers”,即清理spill文件。

/* * Remove any leftover serialized reorder buffers from a slot directory after a * prior crash or decoding session exit. */ static void ReorderBufferCleanupSerializedTXNs(const char *slotname) { DIR *spill_dir; struct dirent *spill_de; struct stat statbuf; char path[MAXPGPATH * 2 + 12]; sprintf(path, "pg_replslot/%s", slotname); /* we're only handling directories here, skip if it's not ours */ if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode)) return; spill_dir = AllocateDir(path); while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL) { /* only look at names that can be ours */ //只对比前3个字符 if (strncmp(spill_de->d_name, "xid", 3) == 0) { snprintf(path, sizeof(path), "pg_replslot/%s/%s", slotname, spill_de->d_name); if (unlink(path) != 0) ereport(ERROR, (errcode_for_file_access(),mkdir errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m", path, slotname))); } } FreeDir(spill_dir); }

以上清理逻辑需要注意两点:

  • 清理文件名以“xid”开头的文件。很明显state文件是不能清理的
  • unlink清理,一次清理一个文件。考虑这一点可以帮助我们构建加速起库方案

数据库启动时清理

数据库启动时会fork一个startup进程来清理slot,清理函数跟walsender调用的清理函数一致:ReorderBufferCleanupSerializedTXNs

还有一个区别在于,walsender重启后,只会清理当前同名slot spill;而数据库启动时会顺序清理所有slot spill。

数据库启动startup进程,while顺序清理逻辑:

void StartupReorderBuffer(void) { DIR *logical_dir; struct dirent *logical_de; logical_dir = AllocateDir("pg_replslot"); while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL) { //排除.和.. if (strcmp(logical_de->d_name, ".") == 0 || strcmp(logical_de->d_name, "..") == 0) continue; //验证slotname是否规范 /* if it cannot be a slot, skip the directory */ if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2)) continue; /* * ok, has to be a surviving logical slot, iterate and delete * everything starting with xid-* */ ReorderBufferCleanupSerializedTXNs(logical_de->d_name); } FreeDir(logical_dir); }

while循环调用ReorderBufferCleanupSerializedTXNs,后面跟walsender启动清理逻辑就一样了。

pg_drop_replication_slot手动清理

drop slot清理逻辑跟自动清理spill文件的逻辑不一样,它没有调用到ReorderBufferCleanupSerializedTXNs

drop slot流程如下:

pg_drop_replication_slot(PG_FUNCTION_ARGS)->ReplicationSlotDrop(const char *name, bool nowait)->ReplicationSlotDropAcquired(void)->ReplicationSlotDropPtr

ReplicationSlotDropPtr清理复制槽的逻辑也很有意思:

/* * Permanently drop the replication slot which will be released by the point * this function returns. */ static void ReplicationSlotDropPtr(ReplicationSlot *slot) { char path[MAXPGPATH]; char tmppath[MAXPGPATH]; /* * If some other backend ran this code concurrently with us, we might try * to delete a slot with a certain name while someone else was trying to * create a slot with the same name. */ LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE); /* Generate pathnames. */ sprintf(path, "pg_replslot/%s", NameStr(slot->data.name)); sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name)); /* * Rename the slot directory on disk, so that we'll no longer recognize * this as a valid slot. Note that if this fails, we've got to mark the * slot inactive before bailing out. If we're dropping an ephemeral or a * temporary slot, we better never fail hard as the caller won't expect * the slot to survive and this might get called during error handling. */ if (rename(path, tmppath) == 0) //rename文件 { /* * We need to fsync() the directory we just renamed and its parent to * make sure that our changes are on disk in a crash-safe fashion. If * fsync() fails, we can't be sure whether the changes are on disk or * not. For now, we handle that by panicking; * StartupReplicationSlots() will try to straighten it out after * restart. */ //fsync持久化 START_CRIT_SECTION(); fsync_fname(tmppath, true); fsync_fname("pg_replslot", true); END_CRIT_SECTION(); } ... /* * If removing the directory fails, the worst thing that will happen is * that the user won't be able to create a new slot with the same name * until the next server restart. We warn about it, but that's all. */ if (!rmtree(tmppath, true)) ereport(WARNING, (errmsg("could not remove directory \"%s\"", tmppath))); /* * We release this at the very end, so that nobody starts trying to create * a slot while we're still cleaning up the detritus of the old one. */ LWLockRelease(ReplicationSlotAllocationLock); }

drop slot不是直接去复制槽目录下面去unlink,而是先把复制槽目录slotname/rename成 slotname.tmp/,然后再去做unlink目录下的文件,最后再删除 slotname.tmp/目录本身。

其中rmtree也是在循环unlink文件。

复制槽溢出发生后加速起库方案测试

1000w个spill删除起来肯定是很慢的,直接mv目录的话就非常快。但是直接mv需要注意mv后的名称和state文件,以及需要知道mv到底跳过了哪一个源码步骤。

mv的名称注意事项

由于是异常停库,startup进程会执行SyncDataDirectoryfsync和stat所有data文件,这一点是比较难绕过的。SyncDataDirectory做完以后,才开始处理复制槽。处理复制槽时会调用StartupReorderBuffer()->ReorderBufferCleanupSerializedTXNs全量清理spill文件。

在进入清理前,会调用ReplicationSlotValidateName校验复制槽名称的有效性,我们可以在ReplicationSlotValidateName上做文章,以骗过startup进程跳过ReorderBufferCleanupSerializedTXNs的过程。

ReplicationSlotValidateName规则:

bool ReplicationSlotValidateName(const char *name, int elevel) { ... for (cp = name; *cp; cp++) { //关键规则在这里 if (!((*cp >= 'a' && *cp <= 'z') || (*cp >= '0' && *cp <= '9') || (*cp == '_'))) { ereport(elevel, (errcode(ERRCODE_INVALID_NAME), errmsg("replication slot name \"%s\" contains invalid character", name), errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character."))); return false; } } return true; }

有效slot name只包含a-z;0-9;_

所以rename时建议加个点.

  • 建议slotname.bak,slotname.20241215等。

  • 不建议slotnamebackup,slotname20241215,slotname_bak等等

  • 不建议.tmp后缀,slotname有.tmp后缀有特殊含义

最后rename后,要创建目录和拷贝state,不然启动的slot会表现的很反常(比如重复的slotname、自动生产一个slotname、删不到slot、下游起不来链路等等)。

汇总推荐mv操作如下:

cd pg_replslot mv slotname slotname.bak mkdir slotname cp slotname.bak/state slotname/

起库时间对比

对比不同源码流程起库速度,看看手工mv/rm加速起库到底有没有意义。

参考源码逻辑原理:

  • 正常停库,走fsync和stat
  • 异常停库,走fsync和stat;
  • 有效mv,将slotname目录命名为.bak,不走unlink
  • 无效mv,将slotname目录命名为_bak且spill文件命名为xid开头,走unlink

由于正常spill文件实在太慢,这里手工伪造slot目录和spill文件,总共50个slot,每个slot 40w个spill,总共2000w个spill来测试起库时间(用cp目录的方式要比cp文件、dd文件快很多)。

编号 测试方案 起库时间
1 正常停库;起库不做fsync和stat,不做unlink 0.1秒
2 正常停库,无效mv;起库不做fsync和stat,做unlink 11分41秒
3 异常停库,有效mv;起库做fsync和stat,不做unlink 4分35秒
4 异常停库,无效mv;起库做fsync和stat,做unlink 32分2秒
5 异常停库,rm(创建slot目录并保留state) 13分04秒

对比方案3、5,理论上当时的场景我们有效mv可以做到4分钟左右起库,rm的话13分钟左右。(这是一个粗糙的对比,恢复环境已经观察到有些东西不一样了)

最后修改时间:2025-01-06 11:12:13
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论