问题现象-起库缓慢
版本pg13.2
数据库启动缓慢,startup进程在读取spill文件,文件名在变化。查看spill文件也很慢,ls -l最后跑出来有800w个文件spill文件。
为什么有上千万个spill文件
wal段和LSN的含义
LSN
LSN总体是一个64位的bigint,LSN实际长这样42D3B/1732C540(hex),斜杠/前是32位逻辑日志号,/后32位是段号+块号+块内偏移。这4个部分分别是:
| 32位 | 8位 | 11位 | 13位 |
|---|---|---|---|
| 逻辑日志号 | 日志段号 | 块号 | 块内偏移 |
块内偏移 8192=2^13
块号=16M(默认wal段大小)/8192
wal segment
wal文件名由3组16进制数字组成。
以8k的wal文件0000000300042D3B00000002为例:
| 32位 | 32位 | 32位 |
|---|---|---|
| timeline | 逻辑日志号 | 日志段号 |
| 00000003 | 00042D3B | 00000002 |
可以看出LSN可以定位到wal文件名及文件中offset位置。
其中,LSN斜杠/前是逻辑日志号,斜杠/后8位的日志段号下面都会用到。
spill文件名转换
复制槽名:logical_ex2209_rep
spill文件名:xid-407989064-lsn-42D1E-20000000.spill
42D1E不是一个完整的LSN,不能直接用pg_walfile_name来定位wal文件名。42D1E是一个逻辑日志号,如果直接过滤文件名含42D1E的wal文件,可以找到16个wal文件。
能否通过数字20000000定位到wal日志段号从而定位到哪一个文件呢?
spill文件名:
/*
* Given a replication slot, transaction ID and segment number, fill in the
* corresponding spill file into 'path', which is a caller-owned buffer of size
* at least MAXPGPATH.
*/
static void
ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid,
XLogSegNo segno)
{
XLogRecPtr recptr;
XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.spill",
NameStr(MyReplicationSlot->data.name),
xid,
(uint32) (recptr >> 32), (uint32) recptr);
}
pg_replslot/%s和xid-%u-lsn比较好理解,就是复制槽名称和xid。后面的recptr还要再看下定义:
/*
* Pointer to a location in the XLOG. These pointers are 64 bits wide,
* because we don't want them ever to overflow.
*/
typedef uint64 XLogRecPtr;
XLogSegNoOffsetToRecPtr是通过wal日志段号、段大小、offset计算LSN:
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest) \
(dest) = (segno) * (wal_segsz_bytes) + (offset)
XLogRecPtr就是LSN!那么
(uint32) (recptr >> 32)表示取LSN前32位,(uint32) recptr)取LSN后32位。
LSN前32位也就是刚才看到的LSN前半段,lsn-42D1E;LSN后32位实际上信息多了,这里只要LSN后32位中的前几位的段号即可。
因为传入的offset=0,也传入了segno,那么根本不需要wal日志段内偏移量信息,就可以计算出dest的值。wal_segsz_bytes的真实值是128M*1014*1024,将XLogSegNoOffsetToRecPtr中的式子转化下为:
segno= dest/(128*1024*1024)
-- 再把16进制20000000转化下
segno= x'20000000'::int/(128*1024*1024)
segno= 4
可以从式子算出日志段号segno,也就可以定位到wal文件号了。
所以,这里的spill文件名:xid-407989064-lsn-42D1E-20000000.spill对应的wal文件为
逻辑日志号=42D1E,段号=04:
ls 42D1E*04 0000000200042D1E00000004
pg_waldump可以看到xid 407989064在里面。
实际上wal大小在实例创建后也是固定的,即(128*1024*1024)是一个常量,那么segno跟(uint32) recptr绝对相关,但不相等。也就是说切换一个wal日志就会切换一个spill。
最后总结spill文件生成规则如下:
- 同一个事务id,如果跨wal就会产生多个spill。如:一个不含子事务的大事务跨越3个wal,就会对应3个spill文件
- 不同的事务id对应不同的spill。如:1000w个子事务对应1000w个spill
spill文件名结构xid-407989064-lsn-42D1E-20000000.spill:
| xid | lsn前32位;即wal逻辑日志号 | 由wal日志段号换算;不等于段号 |
|---|---|---|
| xid-407989064 | lsn-42D1E | 20000000 |
# 恢复出的环境
[postgres]$ ll |head -100
total 40000276
-rw------- 1 postgres postgres 184 Dec 6 15:20 state
-rw------- 1 postgres postgres 196 Dec 6 13:25 xid-407989064-lsn-42D1E-0.spill
-rw------- 1 postgres postgres 208 Dec 6 13:25 xid-407989064-lsn-42D1E-20000000.spill
...
-rw------- 1 postgres postgres 540 Dec 6 16:44 xid-407989064-lsn-42D2A-D0000000.spill
-rw------- 1 postgres postgres 152 Dec 6 13:09 xid-407989065-lsn-42D1D-C8000000.spill
-rw------- 1 postgres postgres 152 Dec 6 13:09 xid-407989066-lsn-42D1D-C8000000.spill
-rw------- 1 postgres postgres 152 Dec 6 13:09 xid-407989068-lsn-42D1D-C8000000.spill
-rw------- 1 postgres postgres 152 Dec 6 13:09 xid-407989070-lsn-42D1D-C8000000.spill
-rw------- 1 postgres postgres 152 Dec 6 13:09 xid-407989072-lsn-42D1D-C8000000.spill
-rw------- 1 postgres postgres 152 Dec 6 13:09 xid-407989076-lsn-42D1D-C8000000.spill
-rw------- 1 postgres postgres 152 Dec 6 13:09 xid-407989079-lsn-42D1D-C8000000.spill
-rw------- 1 postgres postgres 152 Dec 6 13:09 xid-407989080-lsn-42D1D-C8000000.spill
-rw------- 1 postgres postgres 152 Dec 6 13:09 xid-407989082-lsn-42D1D-C8000000.spill
[postgres@lzlhost /myhost/liuzhilong/pg_replslot/logical_ex9e15_rep]$ ll |awk '{print $9}'|awk -F '-' '{print $2}'|sort|uniq -c|wc -l
10000003
[postgres@lzlhost /myhost/liuzhilong/pg_replslot/logical_ex9e15_rep]$ ll |wc -l
10000070
所以我们在实际环境中看到了10000070个文件,文件的distinct xid有10000003个,也就是说1个父事务跨越约70个wal文件,这个父事务有1000w个子事务。
复制槽溢出测试
--发布订阅搭建复制链路
logical_decoding_work_mem = 64MB #pg_ctl reload
wal_segment_size =128 MB
--source
CREATE TABLE replication_table (
id BIGSERIAL PRIMARY KEY,
column1 char(2000),
column2 char(2000),
column3 char(2000)
);
create publication pub_test for table replication_table ;
--dest
CREATE TABLE replication_table (
id BIGSERIAL PRIMARY KEY,
column1 char(2000),
column2 char(2000),
column3 char(2000)
);
CREATE SUBSCRIPTION sub_test
CONNECTION 'host=127.0.0.1 port=8094 dbname=lzl user=lzl password=qwer'
PUBLICATION pub_test;
--source
select * from pg_replication_slots;
大事务、无子事务、复制表溢出测试
--创建一个大事务暂时不提交
begin;
insert into replication_table(column1,column2,column3)
select 'a','b','c' from generate_series(1,1000000) g;
--复制槽溢出
ll
total 331924
-rw------- 1 postgres postgres 184 Dec 9 20:22 state
-rw------- 1 postgres postgres 88226964 Dec 9 20:22 xid-5074343-lsn-163-38000000.spill
-rw------- 1 postgres postgres 119698488 Dec 9 20:22 xid-5074343-lsn-163-40000000.spill
大事务提交后,等待消费直至复制链路延迟为0,spill文件消失
M=# select pid,usename,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,reply_time from pg_stat_replication;
pid | usename | sent_lsn | write_lsn | flush_lsn | replay_lsn | write_lag | flush_lag | replay_lag | reply_time
--------+---------+--------------+--------------+--------------+--------------+-----------+-----------+------------+------------------------------
163525 | lzl | 163/4996E1C8 | 163/4996E1C8 | 163/4996E1C8 | 163/4996E1C8 | [null] | [null] | [null] | 2024-12-09 20:25:35.14769+08
(1 row)
M=# select pid,usename,pg_wal_lsn_diff(pg_current_wal_lsn(),sent_lsn) diff_sent_mb,pg_wal_lsn_diff(pg_current_wal_lsn(),write_lsn) diff_write_mb,pg_wal_lsn_diff(pg_current_wal_lsn(),flush_lsn) diff_flush_mb,pg_wal_lsn_diff(pg_current_wal_lsn(),replay_lsn) diff_replay_mb,pg_walfile_name_offset(sent_lsn) sentoffset,pg_walfile_name_offset(write_lsn) writeoffset,pg_walfile_name_offset(flush_lsn) flush_lsn from pg_stat_replication;
pid | usename | diff_sent_mb | diff_write_mb | diff_flush_mb | diff_replay_mb | sentoffset | writeoffset | flush_lsn
--------+---------+--------------+---------------+---------------+----------------+-------------------------------------+-------------------------------------+-------------------------------
163525 | lzl | 0 | 0 | 0 | 0 | (000000010000016300000009,26665416) | (000000010000016300000009,26665416) | (000000
[/mypg/pg8094/data/pg_replslot/sub_test]$ ll
total 357392
-rw------- 1 postgres postgres 184 Dec 9 20:23 state
-rw------- 1 postgres postgres 88226964 Dec 9 20:22 xid-5074343-lsn-163-38000000.spill
-rw------- 1 postgres postgres 137696328 Dec 9 20:23 xid-5074343-lsn-163-40000000.spill
-rw------- 1 postgres postgres 26076708 Dec 9 20:23 xid-5074343-lsn-163-48000000.spill
[/mypg/pg8094/data/pg_replslot/sub_test]$ ll
total 4
-rw------- 1 postgres postgres 184 Dec 9 20:25 state2666
(1 row)
大事务、无子事务、非复制表溢出测试
--source 创建一个不相干的表,准备写入数据
CREATE TABLE no_replication_table (
id BIGSERIAL PRIMARY KEY,
column1 char(2000),
column2 char(2000),
column3 char(2000)
);
--创建一个大事务暂时不提交
begin;
insert into no_replication_table(column1,column2,column3)
select 'a','b','c' from generate_series(1,1000000) g;
--溢出
[postgres@lzldb:MYINST:8094 /mypg/pg8094/data/pg_replslot/sub_test]$ ll
total 357492
-rw------- 1 postgres postgres 184 Dec 9 20:09 state
-rw------- 1 postgres postgres 107511456 Dec 9 20:08 xid-5074106-lsn-163-28000000.spill
-rw------- 1 postgres postgres 137698804 Dec 9 20:09 xid-5074106-lsn-163-30000000.spill
-rw------- 1 postgres postgres 4308444 Dec 9 20:09 xid-5074106-lsn-163-38000000.spill
大事务、子事务、非复制表溢出测试
# 一次insert一行,每个insert一个子事务
echo "begin;">subtx.sql
for i in {1..1000000}
do
echo "savepoint p$i;">>subtx.sql
echo "insert into no_replication_table(column1,column2,column3) select 'a','b','c';">>subtx.sql
done
nohup psql -d lzl -f subtx.sql &
#执行过程中,观察到溢出80w+文件
[/myhost/pg8094/data/pg_replslot/sub_test]$ ll |wc -l
823749
[/myhost/pg8094/data/pg_replslot/sub_test]$ ll |head -10
total 1099532
-rw------- 1 postgres postgres 184 Dec 9 21:10 state
-rw------- 1 postgres postgres 1236 Dec 9 21:10 xid-5519686-lsn-163-70000000.spill
-rw------- 1 postgres postgres 252 Dec 9 21:09 xid-5519687-lsn-163-70000000.spill
-rw------- 1 postgres postgres 252 Dec 9 21:09 xid-5519688-lsn-163-70000000.spill
-rw------- 1 postgres postgres 252 Dec 9 21:09 xid-5519689-lsn-163-70000000.spill
-rw------- 1 postgres postgres 252 Dec 9 21:09 xid-5519690-lsn-163-70000000.spill
-rw------- 1 postgres postgres 252 Dec 9 21:09 xid-5519691-lsn-163-70000000.spill
-rw------- 1 postgres postgres 252 Dec 9 21:09 xid-5519692-lsn-163-70000000.spill
-rw------- 1 postgres postgres 252 Dec 9 21:09 xid-5519693-lsn-163-70000000.spill
数据库启动慢分析
startup进程起库流程分析
这里以堆栈编号逐栈解析起库流程:
11:main:没啥好说的
10:PostmasterMain:
在主循环前,会先调用起库流程StartupPID = StartupDataBase();本质上是调用StartChildProcess(StartupProcess)
#define StartupDataBase() StartChildProcess(StartupProcess)
9:StartChildProcess :fork一个进程。该进程为启动postmaster的辅助进程,正常的子进程启动都走这个逻辑,在这一步fork。这里的入参AuxProcType=StartupProcess
8:AuxiliaryProcessMain:
因为MyAuxProcType=StartupProcess,所以走的是StartupProcessMain流程,这不同于walsender,walwrite,bgwriter这些子进程的流程。startup进程本身是为了宕机恢复读wal的进程,但是它还做了很多事情
switch (MyAuxProcType)
{
case CheckerProcess:
/* don't set signals, they're useless here */
CheckerModeMain();
proc_exit(1); /* should never return */
case BootstrapProcess:
/*
* There was a brief instant during which mode was Normal; this is
* okay. We need to be in bootstrap mode during BootStrapXLOG for
* the sake of multixact initialization.
*/
SetProcessingMode(BootstrapProcessing);
bootstrap_signals();
BootStrapXLOG();
BootstrapModeMain();
proc_exit(1); /* should never return */
case StartupProcess: //这里这里这里这里
/* don't set signals, startup process has its own agenda */
StartupProcessMain();
proc_exit(1); /* should never return */
case BgWriterProcess:
/* don't set signals, bgwriter has its own agenda */
BackgroundWriterMain();
proc_exit(1); /* should never return */
case CheckpointerProcess:
/* don't set signals, checkpointer has its own agenda */
CheckpointerMain();
proc_exit(1); /* should never return */
case WalWriterProcess:
/* don't set signals, walwriter has its own agenda */
InitXLOGAccess();
WalWriterMain();
proc_exit(1); /* should never return */
case WalReceiverProcess:
/* don't set signals, walreceiver has its own agenda */
WalReceiverMain();
proc_exit(1); /* should never return */
default:
elog(PANIC, "unrecognized process type: %d", (int) MyAuxProcType);
proc_exit(1);
}
7:StartupProcessMain:主要是为了调用StartupXLOG()
6:StartupXLOG:
函数注释:
This must be called ONCE during postmaster or standalone-backend startup
StartupXLOG无论怎样都会被postmaster调用,无论是否是崩溃停库还是一致性停库
switch (ControlFile->state)
{
...
case DB_IN_PRODUCTION:
ereport(LOG,
(errmsg("database system was interrupted; last known up at %s",
str_time(ControlFile->time))));
break;
这跟log日志能对上,以下是log的停库起库输出:
2024-12-06 17:02:57.534 CST,,,447560,,65693cde.6d448,1325,,2023-12-01 09:54:38 CST,,0,LOG,00000,"database system is shut down",,,,,,,,,"","postmaster" 2024-12-06 17:03:49.536 CST,,,211844,,6752bdf3.33b84,1,,2024-12-06 17:03:47 CST,,0,LOG,00000,"ending log output to stderr",,"Future log output will go to log destination ""csvlog"".",,,,,,,"","postmaster" 2024-12-06 17:03:49.536 CST,,,211844,,6752bdf3.33b84,2,,2024-12-06 17:03:47 CST,,0,LOG,00000,"starting PostgreSQL 13.2 (RaseSQL 1.3) on x86_64-pc-linux-gnu, compiled by gcc (GCC) 4.8.5 20150623 (Red Hat 4.8.5-39.0.1), 64-bit",,,,,,,,,"","postmaster" 2024-12-06 17:03:49.537 CST,,,211844,,6752bdf3.33b84,3,,2024-12-06 17:03:47 CST,,0,LOG,00000,"listening on IPv4 address ""0.0.0.0"", port 7284",,,,,,,,,"","postmaster" 2024-12-06 17:03:49.539 CST,,,211844,,6752bdf3.33b84,4,,2024-12-06 17:03:47 CST,,0,LOG,00000,"listening on Unix socket ""/tmp/.s.PGSQL.7284""",,,,,,,,,"","postmaster" 2024-12-06 17:03:49.557 CST,,,211995,,6752bdf5.33c1b,1,,2024-12-06 17:03:49 CST,,0,LOG,00000,"database system was interrupted; last known up at 2024-12-06 17:00:10 CST",,,,,,,,,"","startup"
所以,当时停库后,控制文件记录的数据库状态为in production
Database cluster state: in production
in production这个状态是数据库正在运行,而不是正常的shutdown状态,说明当时数据库停库时不是一致性停库。
继续其中关于fsync的关键代码:
/*----------
* If we previously crashed, perform a couple of actions:
*
* - The pg_wal directory may still include some temporary WAL segments
* used when creating a new segment, so perform some clean up to not
* bloat this path. This is done first as there is no point to sync
* this temporary data.
*
* - There might be data which we had written, intending to fsync it, but
* which we had not actually fsync'd yet. Therefore, a power failure in
* the near future might cause earlier unflushed writes to be lost, even
* though more recent data written to disk from here on would be
* persisted. To avoid that, fsync the entire data directory.
*/
if (ControlFile->state != DB_SHUTDOWNED &&
ControlFile->state != DB_SHUTDOWNED_IN_RECOVERY)
{
RemoveTempXlogFiles();
SyncDataDirectory();
}
这里因为控制文件记录的状态不是正常停库的,所以走到if中调用SyncDataDirectory()做fsync持久化。
StartupXLOG做了很多很多事,其中跟spill相关的除了SyncDataDirectory()还有StartupReorderBuffer():
/*
* Initialize replication slots, before there's a chance to remove
* required resources.
*/
StartupReplicationSlots();
/*
* Startup logical state, needs to be setup now so we have proper data
* during crash recovery.
*/
StartupReorderBuffer();
StartupReorderBuffer虽然也会被调用,它会调用ReorderBufferCleanupSerializedTXNs清理所有slot目录的spill文件(不是删除目录和state文件)
/*
* Delete all data spilled to disk after we've restarted/crashed. It will be
* recreated when the respective slots are reused.
*/
void
StartupReorderBuffer(void)
{
DIR *logical_dir;
struct dirent *logical_de;
logical_dir = AllocateDir("pg_replslot");
while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
{
if (strcmp(logical_de->d_name, ".") == 0 ||
strcmp(logical_de->d_name, "..") == 0)
continue;
/* if it cannot be a slot, skip the directory */
if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
continue;
/*
* ok, has to be a surviving logical slot, iterate and delete
* everything starting with xid-*
*/
ReorderBufferCleanupSerializedTXNs(logical_de->d_name);
}
FreeDir(logical_dir);
}
5:SyncDataDirectory:
这段函数注释非常重要:
/*
* Issue fsync recursively on PGDATA and all its contents.
*
* We fsync regular files and directories wherever they are, but we
* follow symlinks only for pg_wal and immediately under pg_tblspc.
* Other symlinks are presumed to point at files we're not responsible
* for fsyncing, and might not have privileges to write at all.
*
* Errors are logged but not considered fatal; that's because this is used
* only during database startup, to deal with the possibility that there are
* issued-but-unsynced writes pending against the data directory. We want to
* ensure that such writes reach disk before anything that's done in the new
* run. However, aborting on error would result in failure to start for
* harmless cases such as read-only files in the data directory, and that's
* not good either.
*
* Note that if we previously crashed due to a PANIC on fsync(), we'll be
* rewriting all changes again during recovery.
*
* Note we assume we're chdir'd into PGDATA to begin with.
*/
- fsync所有data目录文件使之持久化
- 这个动作只会发生在起库阶段
- 这个动作是为了保证在数据库运行前data目录是完全持久化的
SyncDataDirectory主体是递归遍历目录并fsync(link文件稍微特殊处理一下):
walkdir(".", datadir_fsync_fname, false, LOG);
if (xlog_is_symlink)
walkdir("pg_wal", datadir_fsync_fname, false, LOG);
walkdir("pg_tblspc", datadir_fsync_fname, true, LOG);
4:walkdir:递归到.
3:walkdir:递归到./pg_replslot
2:walkdir:递归到./pg_replslot/slotname
1:lstat :C库调用。walkdir不仅要做fsync(入参函数datadir_fsync_fname),walkdir函数本体还要做 lstat获取文件信息,如inode、文件大小、最近修改时间等等,类似linux的stat命令。
0:_lxstat:C库调用
起库逻辑汇总:
- pg会启动一个辅助进程
startup以协助起库,不同于在常见的childprocess(walwriter、bgwriter、checkpointer等等)进程,它是起库过程中必定会启动的进程,它会做很多事情 StartupXLOG起库时一定会被调用,无论数据库是否一致性停库- 只有非正常停库状态下,才会触发
SyncDataDirectory SyncDataDirectory会fsync持久化所有data文件,并查看所有data文件的stat信息- fsync是为了在库启动前保证data文件都一致;stat应该是为了验证文件是否正常和可读(在startup进程启动前只验证过datadir目录可读性)
- 无论停库状态,
StartupReorderBuffer一定会被调用并清理所有复制槽的spill文件
什么时候是ready状态
startup进程把活干完后数据库还不是ready状态,在pmState状态机改变状态时会调用reaper回收进程函数。reaper函数本身是为了子进程退出后进行一些回收或者启动工作。pmState状态机记录状态为PM_STARTUP,状态机是控制启停库状态的。
PostmasterMain的最后几步:
StartupPID = StartupDataBase();
Assert(StartupPID != 0);
StartupStatus = STARTUP_RUNNING;
pmState = PM_STARTUP; //状态机改变状态
/* Some workers may be scheduled to start now */
maybe_start_bgworkers();
status = ServerLoop();
/*
* ServerLoop probably shouldn't ever return, but if it does, close down.
*/
ExitPostmaster(status != STATUS_OK);
abort(); /* not reached */
}
PostmasterMain起库的核心流程会走到reaper以处理startup进程的正常退出,
PMState注释:
/*
* We use a simple state machine to control startup, shutdown, and
* crash recovery (which is rather like shutdown followed by startup).
*
* After doing all the postmaster initialization work, we enter PM_STARTUP
* state and the startup process is launched. The startup process begins by
* reading the control file and other preliminary initialization steps.
* In a normal startup, or after crash recovery, the startup process exits
* with exit code 0 and we switch to PM_RUN state.
PMState会被信号传递和处理,startup进程退出后reaper会被激活以回收进程。
reaper函数处理startup子进程的正常退出态:
if (pid == StartupPID)
{
StartupPID = 0;
...
/*
* Startup succeeded, commence normal operations
*/
StartupStatus = STARTUP_NOT_RUNNING; //由STARTUP_RUNNING转成STARTUP_NOT_RUNNING
FatalError = false; //上面一堆if未命中后,才不是fatal的
AbortStartTime = 0;
ReachedNormalRunning = true;
pmState = PM_RUN; //状态机由PM_STARTUP转成PM_RUN
connsAllowed = ALLOW_ALL_CONNS;
/*
* Crank up the background tasks, if we didn't do that already
* when we entered consistent recovery state. It doesn't matter
* if this fails, we'll just try again later.
*/
//以下都在启动核心子进程
if (CheckpointerPID == 0)
CheckpointerPID = StartCheckpointer();
if (BgWriterPID == 0)
BgWriterPID = StartBackgroundWriter();
if (WalWriterPID == 0)
WalWriterPID = StartWalWriter();
/*
* Likewise, start other special children as needed. In a restart
* situation, some of them may be alive already.
*/
//以下都在启动非核心子进程
if (!IsBinaryUpgrade && AutoVacuumingActive() && AutoVacPID == 0)
AutoVacPID = StartAutoVacLauncher();
if (PgArchStartupAllowed() && PgArchPID == 0)
PgArchPID = pgarch_start();
if (PgStatPID == 0)
PgStatPID = pgstat_start();
/* workers may be scheduled to start now */
maybe_start_bgworkers();
//此时才是正式的可接受连接状态
/* at this point we are really open for business */
ereport(LOG,
(errmsg("database system is ready to accept connections")));
/* Report status */
AddToDataDirLockFile(LOCK_FILE_LINE_PM_STATUS, PM_STATUS_READY);
#ifdef USE_SYSTEMD
sd_notify(0, "READY=1");
#endif
continue;
}
“database system is ready to accept connections”信息就在这里了
checkpointer、bgwrite、walwrite、autovacuum、arch(如有)、stat这些进程都需要启动,在这个阶段拉起这些进程不是必须返回成功的,后续也可以在ServerLoop或者再次执行reaper时尝试启动,只有startup进程是必须一次性必须启动并完成所有相关任务的:
if (pid < 0)
{
/* in parent, fork failed */
int save_errno = errno;
errno = save_errno;
switch (type)
{
case StartupProcess:
ereport(LOG,
(errmsg("could not fork startup process: %m")));
break;
case BgWriterProcess:
ereport(LOG,
(errmsg("could not fork background writer process: %m")));
break;
case CheckpointerProcess:
ereport(LOG,
(errmsg("could not fork checkpointer process: %m")));
break;
case WalWriterProcess:
ereport(LOG,
(errmsg("could not fork WAL writer process: %m")));
break;
case WalReceiverProcess:
ereport(LOG,
(errmsg("could not fork WAL receiver process: %m")));
break;
default:
ereport(LOG,
(errmsg("could not fork process: %m")));
break;
}
/*
* fork failure is fatal during startup, but there's no need to choke
* immediately if starting other child types fails.
*/
if (type == StartupProcess)
ExitPostmaster(1);
return 0;
}
spill文件生成逻辑各版本差异
spill在各个版本都是spill最大的事务,这里重点关注啥时候spill的逻辑
PG12:pg12的changes是4096条写死
static const Size max_changes_in_memory = 4096;
/*
* Check whether the transaction tx should spill its data to disk.
*/
static void
ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
{
/*
* TODO: improve accounting so we cheaply can take subtransactions into
* account here.
*/
if (txn->nentries_mem >= max_changes_in_memory)
{
ReorderBufferSerializeTXN(rb, txn);
Assert(txn->nentries_mem == 0);
}
}
PG13:超过logical_decoding_work_mem内存大小就spill
static void
ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
{
...
while (rb->size >= logical_decoding_work_mem * 1024L)
{
/*
* Pick the largest transaction (or subtransaction) and evict it from
* memory by serializing it to disk.
*/
txn = ReorderBufferLargestTXN(rb);
ReorderBufferSerializeTXN(rb, txn);
...
}
PG14:多个了一个流式传输ReorderBufferStreamTXN
static void
ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
{
...
while (rb->size >= logical_decoding_work_mem * 1024L)
{
/*
* Pick the largest transaction (or subtransaction) and evict it from
* memory by streaming, if possible. Otherwise, spill to disk.
*/
if (ReorderBufferCanStartStreaming(rb) &&
(txn = ReorderBufferLargestTopTXN(rb)) != NULL)
{...
ReorderBufferStreamTXN(rb, txn);
}
else
{...
ReorderBufferSerializeTXN(rb, txn);
}
...
}
14虽然有了流式复制,但是触发是要一定条件的:
/* Returns true, if the streaming can be started now, false, otherwise. */
static inline bool
ReorderBufferCanStartStreaming(ReorderBuffer *rb)
{
LogicalDecodingContext *ctx = rb->private_data;
SnapBuild *builder = ctx->snapshot_builder;
/* We can't start streaming unless a consistent state is reached. */
if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT)
return false;
/*
* We can't start streaming immediately even if the streaming is enabled
* because we previously decoded this transaction and now just are
* restarting.
*/
if (ReorderBufferCanStream(rb) &&
!SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr))
return true;
return false;
}
/*
* Found a point after SNAPBUILD_FULL_SNAPSHOT where all transactions that
* were running at that point finished. Till we reach that we hold off
* calling any commit callbacks.
*/
SNAPBUILD_CONSISTENT = 2
额外的steam触发条件:
-
条件1:快照中的事务涵盖的所有事务都已完成(应该指commit or rollback)
-
条件2:context是私有数据(是不是说两条链路一张表就不会触发steam?)
-
条件3:快照中的事务是不可忽略的事务(可能指特殊的事务可以忽略,就不做了)
PG15:跟14差不多,只是函数更清晰,套娃少一些了
PG16:差不多
PG17:差不多,新增一个DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE可以强制stream
记忆点:
- PG12及以前是写死的4096条changes
- PG13新增
logical_decoding_work_mem参数,可调整内存大小以减少spill概率 - PG14及以后支持流式复制Streaming
- 触发流式复制也需要一定的条件,所以即使有流式复制也可能会发生spill
- PG17新增
debug_logical_replication_streaming参数以强制触发流式传输
spill文件清理逻辑
起库时清理spill其实只是一种场景,还有启动walsender清理和drop slot清理。
walsender启动时清理
ReorderBufferCleanupSerializedTXNs会在数据库启动(walsender还没有启动)、walsender启动(数据库运行中)时被调用,注意这两部分场景是不一样的,只是他们会调用同一个函数。从函数注释部分也可以看出,该函数是为了“删除残留的序列化的reorder buffers”,即清理spill文件。
/*
* Remove any leftover serialized reorder buffers from a slot directory after a
* prior crash or decoding session exit.
*/
static void
ReorderBufferCleanupSerializedTXNs(const char *slotname)
{
DIR *spill_dir;
struct dirent *spill_de;
struct stat statbuf;
char path[MAXPGPATH * 2 + 12];
sprintf(path, "pg_replslot/%s", slotname);
/* we're only handling directories here, skip if it's not ours */
if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
return;
spill_dir = AllocateDir(path);
while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
{
/* only look at names that can be ours */
//只对比前3个字符
if (strncmp(spill_de->d_name, "xid", 3) == 0)
{
snprintf(path, sizeof(path),
"pg_replslot/%s/%s", slotname,
spill_de->d_name);
if (unlink(path) != 0)
ereport(ERROR,
(errcode_for_file_access(),mkdir
errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m",
path, slotname)));
}
}
FreeDir(spill_dir);
}
以上清理逻辑需要注意两点:
- 清理文件名以“xid”开头的文件。很明显state文件是不能清理的
- unlink清理,一次清理一个文件。考虑这一点可以帮助我们构建加速起库方案
数据库启动时清理
数据库启动时会fork一个startup进程来清理slot,清理函数跟walsender调用的清理函数一致:ReorderBufferCleanupSerializedTXNs。
还有一个区别在于,walsender重启后,只会清理当前同名slot spill;而数据库启动时会顺序清理所有slot spill。
数据库启动startup进程,while顺序清理逻辑:
void
StartupReorderBuffer(void)
{
DIR *logical_dir;
struct dirent *logical_de;
logical_dir = AllocateDir("pg_replslot");
while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
{ //排除.和..
if (strcmp(logical_de->d_name, ".") == 0 ||
strcmp(logical_de->d_name, "..") == 0)
continue;
//验证slotname是否规范
/* if it cannot be a slot, skip the directory */
if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
continue;
/*
* ok, has to be a surviving logical slot, iterate and delete
* everything starting with xid-*
*/
ReorderBufferCleanupSerializedTXNs(logical_de->d_name);
}
FreeDir(logical_dir);
}
while循环调用ReorderBufferCleanupSerializedTXNs,后面跟walsender启动清理逻辑就一样了。
pg_drop_replication_slot手动清理
drop slot清理逻辑跟自动清理spill文件的逻辑不一样,它没有调用到ReorderBufferCleanupSerializedTXNs。
drop slot流程如下:
pg_drop_replication_slot(PG_FUNCTION_ARGS)->ReplicationSlotDrop(const char *name, bool nowait)->ReplicationSlotDropAcquired(void)->ReplicationSlotDropPtr
ReplicationSlotDropPtr清理复制槽的逻辑也很有意思:
/*
* Permanently drop the replication slot which will be released by the point
* this function returns.
*/
static void
ReplicationSlotDropPtr(ReplicationSlot *slot)
{
char path[MAXPGPATH];
char tmppath[MAXPGPATH];
/*
* If some other backend ran this code concurrently with us, we might try
* to delete a slot with a certain name while someone else was trying to
* create a slot with the same name.
*/
LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
/* Generate pathnames. */
sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
/*
* Rename the slot directory on disk, so that we'll no longer recognize
* this as a valid slot. Note that if this fails, we've got to mark the
* slot inactive before bailing out. If we're dropping an ephemeral or a
* temporary slot, we better never fail hard as the caller won't expect
* the slot to survive and this might get called during error handling.
*/
if (rename(path, tmppath) == 0) //rename文件
{
/*
* We need to fsync() the directory we just renamed and its parent to
* make sure that our changes are on disk in a crash-safe fashion. If
* fsync() fails, we can't be sure whether the changes are on disk or
* not. For now, we handle that by panicking;
* StartupReplicationSlots() will try to straighten it out after
* restart.
*/
//fsync持久化
START_CRIT_SECTION();
fsync_fname(tmppath, true);
fsync_fname("pg_replslot", true);
END_CRIT_SECTION();
}
...
/*
* If removing the directory fails, the worst thing that will happen is
* that the user won't be able to create a new slot with the same name
* until the next server restart. We warn about it, but that's all.
*/
if (!rmtree(tmppath, true))
ereport(WARNING,
(errmsg("could not remove directory \"%s\"", tmppath)));
/*
* We release this at the very end, so that nobody starts trying to create
* a slot while we're still cleaning up the detritus of the old one.
*/
LWLockRelease(ReplicationSlotAllocationLock);
}
drop slot不是直接去复制槽目录下面去unlink,而是先把复制槽目录slotname/rename成 slotname.tmp/,然后再去做unlink目录下的文件,最后再删除 slotname.tmp/目录本身。
其中rmtree也是在循环unlink文件。
复制槽溢出发生后加速起库方案测试
1000w个spill删除起来肯定是很慢的,直接mv目录的话就非常快。但是直接mv需要注意mv后的名称和state文件,以及需要知道mv到底跳过了哪一个源码步骤。
mv的名称注意事项
由于是异常停库,startup进程会执行SyncDataDirectoryfsync和stat所有data文件,这一点是比较难绕过的。SyncDataDirectory做完以后,才开始处理复制槽。处理复制槽时会调用StartupReorderBuffer()->ReorderBufferCleanupSerializedTXNs全量清理spill文件。
在进入清理前,会调用ReplicationSlotValidateName校验复制槽名称的有效性,我们可以在ReplicationSlotValidateName上做文章,以骗过startup进程跳过ReorderBufferCleanupSerializedTXNs的过程。
ReplicationSlotValidateName规则:
bool
ReplicationSlotValidateName(const char *name, int elevel)
{
...
for (cp = name; *cp; cp++)
{ //关键规则在这里
if (!((*cp >= 'a' && *cp <= 'z')
|| (*cp >= '0' && *cp <= '9')
|| (*cp == '_')))
{
ereport(elevel,
(errcode(ERRCODE_INVALID_NAME),
errmsg("replication slot name \"%s\" contains invalid character",
name),
errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
return false;
}
}
return true;
}
有效slot name只包含a-z;0-9;_。
所以rename时建议加个点.,
-
建议
slotname.bak,slotname.20241215等。 -
不建议
slotnamebackup,slotname20241215,slotname_bak等等 -
不建议
.tmp后缀,slotname有.tmp后缀有特殊含义
最后rename后,要创建目录和拷贝state,不然启动的slot会表现的很反常(比如重复的slotname、自动生产一个slotname、删不到slot、下游起不来链路等等)。
汇总推荐mv操作如下:
cd pg_replslot mv slotname slotname.bak mkdir slotname cp slotname.bak/state slotname/
起库时间对比
对比不同源码流程起库速度,看看手工mv/rm加速起库到底有没有意义。
参考源码逻辑原理:
- 正常停库,走fsync和stat
- 异常停库,走fsync和stat;
- 有效mv,将slotname目录命名为
.bak,不走unlink - 无效mv,将slotname目录命名为
_bak且spill文件命名为xid开头,走unlink
由于正常spill文件实在太慢,这里手工伪造slot目录和spill文件,总共50个slot,每个slot 40w个spill,总共2000w个spill来测试起库时间(用cp目录的方式要比cp文件、dd文件快很多)。
| 编号 | 测试方案 | 起库时间 |
|---|---|---|
| 1 | 正常停库;起库不做fsync和stat,不做unlink | 0.1秒 |
| 2 | 正常停库,无效mv;起库不做fsync和stat,做unlink | 11分41秒 |
| 3 | 异常停库,有效mv;起库做fsync和stat,不做unlink | 4分35秒 |
| 4 | 异常停库,无效mv;起库做fsync和stat,做unlink | 32分2秒 |
| 5 | 异常停库,rm(创建slot目录并保留state) | 13分04秒 |
对比方案3、5,理论上当时的场景我们有效mv可以做到4分钟左右起库,rm的话13分钟左右。(这是一个粗糙的对比,恢复环境已经观察到有些东西不一样了)




