暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

PostgreSQL之进程分析

936
作者时间QQ技术交流群
perrynzhou@gmail.com2020/12/01672152841

基本介绍


什么是PostgreSQL?

  • PosgreSQL是一个 开源、对象关系的数据库系统。目前可以运行在Linux/Unix/Windows平台。支持ACID,内置INTEGER/NUMBERIC/BOOLEAN/CHAR/VARCHAR/DATE/INTERVAL/TIMESTAMP/binary larget objects等数据结构


PostgreSQL有些限制?



PostgreSQL有哪些核心功能?

  • MVCC

  • PITR 时间点恢复

  • 独立表空间和异步复制

  • Nested 事务和online/hot备份

  • 查询计划和优化器

  • 采用WAL机制保证可靠性


架构概览


数据库文件布局



  • base:存储数据库的目录,每个数据一个文件.下面是创建一个sampledb数据库,然后查看base下面的数据库对应的oid文件。sampledb的OID是163984,那么在base目录下就会有一个base/16384的数据库目录


$ psql  -h 127.0.0.1   -d postgres
postgres=# CREATE DATABASE sampledb OWNER perrynzhou;
CREATE DATABASE
postgres=# GRANT ALL PRIVILEGES ON DATABASE sampledb to perrynzhou;
GRANT
postgres=# select oid, datname from pg_database ;
oid | datname
-------+-----------
12974 | postgres
1 | template1
12973 | template0
16384 | sampledb
(4 rows)
postgres=#





  • global:数据库集群维度的系统表,比如pg_database等

  • pg_commit_ts: 这个目录包含了事务提交的时间戳数据

  • pg_dynshmem: 包含动态共享内存系统使用的文件

  • pg_logical:存储逻辑解码状态码

  • pg_multixact:存储多事务状态码数据

  • pg_notify:包含listen/notify的状态数据

  • pg_replslot:存储复制的slot数据

  • pg_serial:存储序列化事务的提交信息

  • pg_snapshots:存储导出快照的信息

  • pg_stat:包含静态子系统的持久化文件

  • pg_stat_tmp:包含静态系统的临时文件

  • pg_subtrans:包含子事务的状态码信息

  • pg_tblspc:每个表空间的符号链接信息

  • pg_twophase:包含事务准备阶段的状态文件

  • PG_VERSION:记录PostgreSQL的大版本号

  • pg_wal:包含wal的日志文件

  • pg_xact:包含事务提交的状态码信息



PostgreSQL的进程模型





  • PostMaster进程:当PostgreSQL启动后,Postmaster进程会第一个启动,这个进程是PG所有进程的父进程。该进程负责所有控制处理。外部发来的请求第一次都会请求到该进程,由该进程负责给每个请求初始化一个子进程服务。

  • 客户端进程:PostgreSQL启动后主服务的PostMaster主进程启动,负责监听postgresql.conf中的port端口同时初始化整个PostgreSQL的其他的内部进程,每个一个请求到这个端口,主进程会fork一个子进程,根据pg_hba.conf中的配置策略服务或者拒绝这个请求的处理。后续这个客户端子进程会接受SQL语句,然后拿到结果返回给请求的客户端。

  • background writer进程:write进程负责把数据写入到共享内存的cache,在合适的时间定期flush到磁盘。这个进程每次按照 postgresql.conf中的ngwriter_delay来控制每次写的频度;其次是PG在做常规checkpoint时候必须把所有的脏页flush到磁盘。


  • checkpointer 进程:检查点事务序列号,设置检查点可确保将检查点刷新到磁盘之前的日志信息,也就是说检查点之前的所有wal 日志在PG崩溃之后,是不需要进行恢复的,只需要从检查点之后的所有wal log进行日志恢复。

  • walwriter 进程:wal writer进程负责把wal cache中的日志数据在适合的时间点flush到Wal日志文件。wal log的核心思想是修改数据库文件之前必须先把更改数据记录到log,也就是说wal日志是先于数据文件写入的。


  • archiver 进程:归档进程负责把WAL 日志归档到归档路径中。如果是磁盘损坏,可以从这个归档日志中进行数据恢复。

  • stats collector 进程:该进程负责收集表和磁盘的访问的静态信息,包括表的添加、删除、更改的的数据、data block的数量、索引改变等静态信息,这些信息主要给PG的优化器使用,以便提供更优的执行计划,这些信息也会被autovacuum进程使用。

  • autovacuum launcher进程:在PG中数据表的UPDATE/DELETE操作不是立即删除旧版本数据而是标记为删除,这样做的目的是为了PG的MVCC.当事务提交,旧版本的数据不再需求了,这些数据需要清理腾出空间,这个清理的工作就是有autovacuum进程处理

  • wal sender/wal receiver进程:wal sender和wal receiver进程是PG实现streaming replication的基础。wal sender进程负责把WAL日志文件通过网络传输到send receiver节点;wal receiver进程负责结构wal log日志文件,同时从wal log进行数据恢复的操作。



 

PostgreSQL SQL执行流程




  • 从前端过来的SQL语句到了对应的服务端fork的客户端进程,经过SQL的词法、语法解析->SQL重写->生成物理执行计划->SQL语句的执行,最终该进程把结果返回给前端,来完成整个SQL语句的请求的生命周期。


// port是外部请求信息的封装
Port *port;
// 初始化port
port = ConnCreate(ListenSocket[i]);
// 启动一个用来处理外部请求的服务子进程
BackendStartup(port);

static int BackendStartup(Port *port)
{
Backend *bn; /* for backend cleanup */
pid_t pid;
pid = fork_process();
if (pid == 0) /* child */
{
free(bn);

/* Detangle from postmaster */
InitPostmasterChild();

/* Close the postmaster's sockets */
ClosePostmasterPorts(false);

/* Perform additional initialization and collect startup packet */
BackendInitialize(port);

// 在子进程中进行初始化和运行外部请求和子进程的交互
BackendRun(port);
}

}

static void BackendRun(Port *port)
{
char *av[2];
const int ac = 1;

av[0] = "postgres";
av[1] = NULL;

// 子进程的内存初始化
MemoryContextSwitchTo(TopMemoryContext);
// 启动一个子进程
PostgresMain(ac, av, port->database_name, port->user_name);
}


// PG用于处理外部请求的子进程入口函数
void PostgresMain(int argc, char *argv[],
const char *dbname,
const char *username)
{
int firstchar;
StringInfoData input_message;
sigjmp_buf local_sigjmp_buf;
volatile bool send_ready_for_query = true;
bool idle_in_transaction_timeout_enabled = false;
bool idle_session_timeout_enabled = false;

// 子进程的初始化
InitPostgres(dbname, InvalidOid, username, InvalidOid, NULL, false);

// 是否配置wal sender
if (am_walsender)
InitWalSender();

for (;;)
{

DoingCommandRead = true;
// 通过TCP读取客户端传过来的数据,该数据封装了PG客户端和该子进程的协议和SQL语句
firstchar = ReadCommand(&input_message);

DoingCommandRead = false;

// 根据读取到的第一个字符来判断请求类型
switch (firstchar)
{
// 简单SQL语句的处理
case 'Q': /* simple query */
{
const char *query_string;

query_string = pq_getmsgstring(&input_message);
pq_getmsgend(&input_message);

if (am_walsender)
{
if (!exec_replication_command(query_string))
exec_simple_query(query_string);
}
else
exec_simple_query(query_string);

send_ready_for_query = true;
}
break;
// SQL语句的协议解析
case 'P': /* parse */
{
const char *stmt_name;
const char *query_string;
int numParams;
Oid *paramTypes = NULL;

forbidden_in_wal_sender(firstchar);

/* Set statement_timestamp() */
SetCurrentStatementStartTimestamp();

stmt_name = pq_getmsgstring(&input_message);
query_string = pq_getmsgstring(&input_message);
numParams = pq_getmsgint(&input_message, 2);
if (numParams > 0)
{
paramTypes = (Oid *) palloc(numParams * sizeof(Oid));
for (int i = 0; i < numParams; i++)
paramTypes[i] = pq_getmsgint(&input_message, 4);
}
pq_getmsgend(&input_message);

exec_parse_message(query_string, stmt_name,
paramTypes, numParams);
}
break;
case 'C': /* close */
{
int close_type;
const char *close_target;

forbidden_in_wal_sender(firstchar);

close_type = pq_getmsgbyte(&input_message);
close_target = pq_getmsgstring(&input_message);
pq_getmsgend(&input_message);

switch (close_type)
{
case 'S':
if (close_target[0] != '\0')
DropPreparedStatement(close_target, false);
else
{
/* special-case the unnamed statement */
drop_unnamed_stmt();
}
break;
case 'P':
{
Portal portal;

portal = GetPortalByName(close_target);
if (PortalIsValid(portal))
PortalDrop(portal, false);
}
break;
default:
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid CLOSE message subtype %d",
close_type)));
break;
}

if (whereToSendOutput == DestRemote)
pq_putemptymessage('3'); /* CloseComplete */
}
break;

case 'D': /* describe */
{
int describe_type;
const char *describe_target;

forbidden_in_wal_sender(firstchar);

/* Set statement_timestamp() (needed for xact) */
SetCurrentStatementStartTimestamp();

describe_type = pq_getmsgbyte(&input_message);
describe_target = pq_getmsgstring(&input_message);
pq_getmsgend(&input_message);

switch (describe_type)
{
case 'S':
exec_describe_statement_message(describe_target);
break;
case 'P':
exec_describe_portal_message(describe_target);
break;
default:
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid DESCRIBE message subtype %d",
describe_type)));
break;
}
}
break;

case 'H': /* flush */
pq_getmsgend(&input_message);
if (whereToSendOutput == DestRemote)
pq_flush();
break;

case 'S': /* sync */
pq_getmsgend(&input_message);
finish_xact_command();
send_ready_for_query = true;
break;

case EOF:

pgStatSessionEndCause = DISCONNECT_CLIENT_EOF;

case 'X':
if (whereToSendOutput == DestRemote)
whereToSendOutput = DestNone;

proc_exit(0);


}
}
}




PostgreSQL进程模型源码分析

  • PostgreSQL进程启动源码,整个逻辑实现是在ServerLoop函数中,PostmasterMain是PG启动的第一个进程,由它来实现其他的内部进程,其他的内部进程是在SeverLoop的实现如下


// pg_ctl -D /data/postgres/data -l logfile start执行以后,最终的入口函数就是PostmasterMain函数
void PostmasterMain(int argc, char *argv[])
{

status = ServerLoop();
}

}
static int ServerLoop(void)
{
fd_set readmask;
int nSockets;
time_t last_lockfile_recheck_time,
last_touch_time;
// 初始化数据库的监听socket
nSockets = initMasks(&readmask);
// 无限的死循环,不断的监听
for (;;)
{
fd_set rmask;
int selres;
time_t now;
// 这里采用select来监听来自外部的请求,这里有些疑惑为啥不采用epoll模型?
// select目前仅仅是循环找到哪些fd有IO事件吗,这个在高并发的场景下,效率相对比较低
selres = select(nSockets, &rmask, NULL, NULL, &timeout);
if (selres > 0)
{
int i;

for (i = 0; i < MAXLISTEN; i++)
{
if (ListenSocket[i] == PGINVALID_SOCKET)
break;
if (FD_ISSET(ListenSocket[i], &rmask))
{
// Port封装了此次连接的客户端信息
Port *port;
//
port = ConnCreate(ListenSocket[i]);
if (port)
{
// fork一个子进程,用来服务此次来自外部客户端的请求
BackendStartup(port);
StreamClose(port->sock);
ConnFree(port);
}
}
}
}
// 如果配置文件中启用了syslog,则启动一个log进程用来写入PG的日志
if (SysLoggerPID == 0 && Logging_collector)
SysLoggerPID = SysLogger_Start();


if (pmState == PM_RUN || pmState == PM_RECOVERY ||
pmState == PM_HOT_STANDBY)
{
// 启动checkpoint进程
if (CheckpointerPID == 0)
CheckpointerPID = StartCheckpointer();
// 启动后台的writer进程
if (BgWriterPID == 0)
BgWriterPID = StartBackgroundWriter();
}
// 启动wal writer进程
if (WalWriterPID == 0 && pmState == PM_RUN)
WalWriterPID = StartWalWriter();

if (!IsBinaryUpgrade && AutoVacPID == 0 &&
(AutoVacuumingActive() || start_autovac_launcher) &&
pmState == PM_RUN)
{
// 启动auto vacuum进程
AutoVacPID = StartAutoVacLauncher();
if (AutoVacPID != 0)
start_autovac_launcher = false; /* signal processed */
}

//启动statics collector进程
if (PgStatPID == 0 &&
(pmState == PM_RUN || pmState == PM_HOT_STANDBY))
PgStatPID = pgstat_start();
// 如果配置了archive进程,则启动该进程
if (PgArchPID == 0 && PgArchStartupAllowed())
PgArchPID = StartArchiver();

// 如果配置了复制,同时是slave节点,则启动receive进程
if (WalReceiverRequested)
MaybeStartWalReceiver();

//启动后台工作进程
if (StartWorkerNeeded || HaveCrashedWorker)
maybe_start_bgworkers();

now = time(NULL);

}



文章转载自存储内核技术交流,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论