暂无图片
暂无图片
2
暂无图片
暂无图片
暂无图片

PostgreSQL源码分析——日志进程

603

日志模块是PostgreSQL中负责记录系统运行状态和错误信息的模块。它将系统运行过程中的关键信息记录到日志文件中,以便于系统管理员进行故障排查和性能优化。这里我们就分析一下其实现。

实现思路

在PG中,采用的是多进程架构,每个进程都会记录自己的日志信息,因此日志模块需要考虑如何将不同进程的日志信息进行统一管理,并且能够方便的查询和查看。因此,我们需要将不同进程的日志统一记录到一个日志文件中,而不是每个进程一个日志文件。这样,我们就可以通过查看一个日志文件来了解整个系统的运行状态。对此,PG中从postmaster进程中fork了一个日志进程logger,其他进程通过管道与logger进程通信,将日志信息发送给logger进程,logger进程再将日志信息写入日志文件中。

除了需要增加一个日志进程,还需要定义日志级别,解决日志发送端的问题,需要定义函数,各进程通过调用这个函数,将日志写入管道中。

logger进程源码分析

  1. 日志进程的启动,postmaster进程fork一个日志子进程
SysLoggerMain(int argc, char ** argv) (src\backend\postmaster\syslogger.c:303) SysLogger_Start() (src\backend\postmaster\syslogger.c:652) PostmasterMain(int argc, char ** argv) (src\backend\postmaster\postmaster.c:1113) main(int argc, char ** argv) (src\backend\main\main.c:209)

主要代码在src/backend/postmaster/syslogger.c中,其他进程通过管道与日志进程通信,日志进程只读管道,其他进程将日志写入管道中。

int syslogPipe[2] = {-1, -1}; // 管道,文件描述符 // 启动日志进程 int SysLogger_Start(void) { if (syslogPipe[0] < 0) // 如果管道未创建,则创建管道 { if (pipe(syslogPipe) < 0) ereport(FATAL, (errcode_for_socket_access(), errmsg("could not create pipe for syslog: %m"))); } // 创建日志目录,如果不存在的话 (void) MakePGDirectory(Log_directory); first_syslogger_file_time = time(NULL); filename = logfile_getname(first_syslogger_file_time, NULL); syslogFile = logfile_open(filename, "a", false); pfree(filename); switch ((sysloggerPid = fork_process())) // fork一个日志子进程 { case -1: ereport(LOG, (errmsg("could not fork system logger: %m"))); return 0; case 0: // logger子进程 /* in postmaster child ... */ InitPostmasterChild(); /* Close the postmaster's sockets */ ClosePostmasterPorts(true); /* Drop our connection to postmaster's shared memory, as well */ dsm_detach_all(); PGSharedMemoryDetach(); /* do the work */ SysLoggerMain(0, NULL); break; default: // postmaster进程 /* success, in postmaster */ // 重定向,stdout,stderr输出到管道中 /* now we redirect stderr, if not done already */ if (!redirection_done) { ereport(LOG, (errmsg("redirecting log output to logging collector process"), errhint("Future log output will appear in directory \"%s\".", Log_directory))); fflush(stdout); if (dup2(syslogPipe[1], fileno(stdout)) < 0) ereport(FATAL, (errcode_for_file_access(), errmsg("could not redirect stdout: %m"))); fflush(stderr); if (dup2(syslogPipe[1], fileno(stderr)) < 0) ereport(FATAL, (errcode_for_file_access(), errmsg("could not redirect stderr: %m"))); /* Now we are done with the write end of the pipe. */ close(syslogPipe[1]); syslogPipe[1] = -1; redirection_done = true; } /* postmaster will never write the file(s); close 'em */ fclose(syslogFile); syslogFile = NULL; if (csvlogFile != NULL) { fclose(csvlogFile); csvlogFile = NULL; } return (int) sysloggerPid; } /* we should never reach here */ return 0; } // logger子进程读管道,将读到的日志写入日志文件中 void SysLoggerMain(int argc, char *argv[]) { if (syslogPipe[1] >= 0) close(syslogPipe[1]); syslogPipe[1] = -1; // logger进程只需要读管道,关闭写 last_file_name = logfile_getname(first_syslogger_file_time, NULL); /* remember active logfile parameters */ currentLogDir = pstrdup(Log_directory); currentLogFilename = pstrdup(Log_filename); currentLogRotationAge = Log_RotationAge; /* set next planned rotation time */ set_next_rotation_time(); update_metainfo_datafile(); wes = CreateWaitEventSet(CurrentMemoryContext, 2); AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); AddWaitEventToSet(wes, WL_SOCKET_READABLE, syslogPipe[0], NULL, NULL); /* main worker loop */ for (;;) { /* Clear any already-pending wakeups */ ResetLatch(MyLatch); // 处理配置文件重载 if (ConfigReloadPending) { ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); // 处理与日志相关的一些GUC参数变化 // ... } rc = WaitEventSetWait(wes, cur_timeout, &event, 1, WAIT_EVENT_SYSLOGGER_MAIN); if (rc == 1 && event.events == WL_SOCKET_READABLE) { int bytesRead; // 读管道 bytesRead = read(syslogPipe[0], logbuffer + bytes_in_logbuffer, sizeof(logbuffer) - bytes_in_logbuffer); if (bytesRead < 0) { if (errno != EINTR) ereport(LOG, (errcode_for_socket_access(), errmsg("could not read from logger pipe: %m"))); } else if (bytesRead > 0) { // 读取管道中的日志,并写入日志文件 bytes_in_logbuffer += bytesRead; process_pipe_input(logbuffer, &bytes_in_logbuffer); continue; } else { /* * Zero bytes read when select() is saying read-ready means * EOF on the pipe: that is, there are no longer any processes * with the pipe write end open. Therefore, the postmaster * and all backends are shut down, and we are done. */ pipe_eof_seen = true; /* if there's any data left then force it out now */ flush_pipe_input(logbuffer, &bytes_in_logbuffer); } } } }

具体的,还需要一个协议,用于解析日志文件中的日志条目。也就是说,日志文件中的日志条目需要有一个协议,用于解析这些日志条目。其中从管道读数据后写入文件的调用栈如下:

libc.so.6!__GI__IO_fwrite(const void * buf, size_t size, size_t count, FILE * fp) (iofwrite.c:31)
write_syslogger_file(const char * buffer, int count, int destination) (src\backend\postmaster\syslogger.c:1099)
process_pipe_input(char * logbuffer, int * bytes_in_logbuffer) (src\backend\postmaster\syslogger.c:981)
SysLoggerMain(int argc, char ** argv) (src\backend\postmaster\syslogger.c:479)
SysLogger_Start() (src\backend\postmaster\syslogger.c:652)
PostmasterMain(int argc, char ** argv) (src\backend\postmaster\postmaster.c:1113)
main(int argc, char ** argv) (src\backend\main\main.c:209)

定义如下:

/* * Primitive protocol structure for writing to syslogger pipe(s). The idea * here is to divide long messages into chunks that are not more than * PIPE_BUF bytes long, which according to POSIX spec must be written into * the pipe atomically. The pipe reader then uses the protocol headers to * reassemble the parts of a message into a single string. The reader can * also cope with non-protocol data coming down the pipe, though we cannot * guarantee long strings won't get split apart. * * We use non-nul bytes in is_last to make the protocol a tiny bit * more robust against finding a false double nul byte prologue. But * we still might find it in the len and/or pid bytes unless we're careful. */ #ifdef PIPE_BUF /* Are there any systems with PIPE_BUF > 64K? Unlikely, but ... */ #if PIPE_BUF > 65536 #define PIPE_CHUNK_SIZE 65536 #else #define PIPE_CHUNK_SIZE ((int) PIPE_BUF) #endif #else /* not defined */ /* POSIX says the value of PIPE_BUF must be at least 512, so use that */ #define PIPE_CHUNK_SIZE 512 #endif typedef struct { char nuls[2]; /* always \0\0 */ uint16 len; /* size of this chunk (counts data only) */ int32 pid; /* writer's pid */ char is_last; /* last chunk of message? 't' or 'f' ('T' or * 'F' for CSV case) */ char data[FLEXIBLE_ARRAY_MEMBER]; /* data payload starts here */ } PipeProtoHeader; typedef union { PipeProtoHeader proto; char filler[PIPE_CHUNK_SIZE]; } PipeProtoChunk; #define PIPE_HEADER_SIZE offsetof(PipeProtoHeader, data) #define PIPE_MAX_PAYLOAD ((int) (PIPE_CHUNK_SIZE - PIPE_HEADER_SIZE))

因为是依赖于管道进行通信的,PIPE_BUF规定了内核的管道缓冲区大小,正常情况下是4096字节,但是有些系统可能会更大。如果日志条目超过这个字节数,那么就会分成多个chunk进行传输。

/* -------------------------------- * pipe protocol handling * -------------------------------- */ /* * Process data received through the syslogger pipe. * * This routine interprets the log pipe protocol which sends log messages as * (hopefully atomic) chunks - such chunks are detected and reassembled here. * * The protocol has a header that starts with two nul bytes, then has a 16 bit * length, the pid of the sending process, and a flag to indicate if it is * the last chunk in a message. Incomplete chunks are saved until we read some * more, and non-final chunks are accumulated until we get the final chunk. * * All of this is to avoid 2 problems: * . partial messages being written to logfiles (messes rotation), and * . messages from different backends being interleaved (messages garbled). * * Any non-protocol messages are written out directly. These should only come * from non-PostgreSQL sources, however (e.g. third party libraries writing to * stderr). * * logbuffer is the data input buffer, and *bytes_in_logbuffer is the number * of bytes present. On exit, any not-yet-eaten data is left-justified in * logbuffer, and *bytes_in_logbuffer is updated. */ static void process_pipe_input(char *logbuffer, int *bytes_in_logbuffer) { char *cursor = logbuffer; int count = *bytes_in_logbuffer; int dest = LOG_DESTINATION_STDERR; /* While we have enough for a header, process data... */ while (count >= (int) (offsetof(PipeProtoHeader, data) + 1)) { PipeProtoHeader p; int chunklen; /* Do we have a valid header? */ memcpy(&p, cursor, offsetof(PipeProtoHeader, data)); if (p.nuls[0] == '\0' && p.nuls[1] == '\0' && p.len > 0 && p.len <= PIPE_MAX_PAYLOAD && p.pid != 0 && (p.is_last == 't' || p.is_last == 'f' || p.is_last == 'T' || p.is_last == 'F')) { List *buffer_list; ListCell *cell; save_buffer *existing_slot = NULL, *free_slot = NULL; StringInfo str; chunklen = PIPE_HEADER_SIZE + p.len; /* Fall out of loop if we don't have the whole chunk yet */ if (count < chunklen) break; dest = (p.is_last == 'T' || p.is_last == 'F') ? LOG_DESTINATION_CSVLOG : LOG_DESTINATION_STDERR; /* Locate any existing buffer for this source pid */ buffer_list = buffer_lists[p.pid % NBUFFER_LISTS]; foreach(cell, buffer_list) { save_buffer *buf = (save_buffer *) lfirst(cell); if (buf->pid == p.pid) { existing_slot = buf; break; } if (buf->pid == 0 && free_slot == NULL) free_slot = buf; } if (p.is_last == 'f' || p.is_last == 'F') { /* * Save a complete non-final chunk in a per-pid buffer */ if (existing_slot != NULL) { /* Add chunk to data from preceding chunks */ str = &(existing_slot->data); appendBinaryStringInfo(str, cursor + PIPE_HEADER_SIZE, p.len); } else { /* First chunk of message, save in a new buffer */ if (free_slot == NULL) { /* * Need a free slot, but there isn't one in the list, * so create a new one and extend the list with it. */ free_slot = palloc(sizeof(save_buffer)); buffer_list = lappend(buffer_list, free_slot); buffer_lists[p.pid % NBUFFER_LISTS] = buffer_list; } free_slot->pid = p.pid; str = &(free_slot->data); initStringInfo(str); appendBinaryStringInfo(str, cursor + PIPE_HEADER_SIZE, p.len); } } else { /* * Final chunk --- add it to anything saved for that pid, and * either way write the whole thing out. */ if (existing_slot != NULL) { str = &(existing_slot->data); appendBinaryStringInfo(str, cursor + PIPE_HEADER_SIZE, p.len); write_syslogger_file(str->data, str->len, dest); /* Mark the buffer unused, and reclaim string storage */ existing_slot->pid = 0; pfree(str->data); } else { /* The whole message was one chunk, evidently. */ write_syslogger_file(cursor + PIPE_HEADER_SIZE, p.len, dest); } } /* Finished processing this chunk */ cursor += chunklen; count -= chunklen; } else { /* Process non-protocol data */ /* * Look for the start of a protocol header. If found, dump data * up to there and repeat the loop. Otherwise, dump it all and * fall out of the loop. (Note: we want to dump it all if at all * possible, so as to avoid dividing non-protocol messages across * logfiles. We expect that in many scenarios, a non-protocol * message will arrive all in one read(), and we want to respect * the read() boundary if possible.) */ for (chunklen = 1; chunklen < count; chunklen++) { if (cursor[chunklen] == '\0') break; } /* fall back on the stderr log as the destination */ write_syslogger_file(cursor, chunklen, LOG_DESTINATION_STDERR); cursor += chunklen; count -= chunklen; } } /* We don't have a full chunk, so left-align what remains in the buffer */ if (count > 0 && cursor != logbuffer) memmove(logbuffer, cursor, count); *bytes_in_logbuffer = count; } void write_syslogger_file(const char *buffer, int count, int destination) { int rc; FILE *logfile; /* * If we're told to write to csvlogFile, but it's not open, dump the data * to syslogFile (which is always open) instead. This can happen if CSV * output is enabled after postmaster start and we've been unable to open * csvlogFile. There are also race conditions during a parameter change * whereby backends might send us CSV output before we open csvlogFile or * after we close it. Writing CSV-formatted output to the regular log * file isn't great, but it beats dropping log output on the floor. * * Think not to improve this by trying to open csvlogFile on-the-fly. Any * failure in that would lead to recursion. */ logfile = (destination == LOG_DESTINATION_CSVLOG && csvlogFile != NULL) ? csvlogFile : syslogFile; rc = fwrite(buffer, 1, count, logfile); /* * Try to report any failure. We mustn't use ereport because it would * just recurse right back here, but write_stderr is OK: it will write * either to the postmaster's original stderr, or to /dev/null, but never * to our input pipe which would result in a different sort of looping. */ if (rc != count) write_stderr("could not write to log file: %s\n", strerror(errno)); }

elog源码分析

这部分的代码主要再elog.h、elog.c中。定义了日志级别,日志打印的函数定义,日志项的定义。 这里是日志的写端,logger进程是日志的读端,读后写入日志文件中。

在各进程需要打印日志时,调用ereport,定义如下:

#define ereport_domain(elevel, domain, ...) \ do { \ const int elevel_ = (elevel); \ pg_prevent_errno_in_scope(); \ if (errstart(elevel_, domain)) \ __VA_ARGS__, errfinish(__FILE__, __LINE__, PG_FUNCNAME_MACRO); \ if (elevel_ >= ERROR) \ pg_unreachable(); \ } while(0) #define ereport(elevel, ...) \ ereport_domain(elevel, TEXTDOMAIN, __VA_ARGS__)

我们任意调用一个日志,具体的调用栈如下:

write_pipe_chunks(char * data, int len, int dest) (src\backend\utils\error\elog.c:3290) send_message_to_server_log(ErrorData * edata) (src\backend\utils\error\elog.c:3191) EmitErrorReport() (src\backend\utils\error\elog.c:1546) errfinish(const char * filename, int lineno, const char * funcname) (src\backend\utils\error\elog.c:597) ShowUsage(const char * title) (src\backend\tcop\postgres.c:4977) exec_simple_query(const char * query_string) (src\backend\tcop\postgres.c:1321) PostgresMain(int argc, char ** argv, const char * dbname, const char * username) (src\backend\tcop\postgres.c:4577) BackendRun(Port * port) (src\backend\postmaster\postmaster.c:4540) BackendStartup(Port * port) (src\backend\postmaster\postmaster.c:4262) ServerLoop() (src\backend\postmaster\postmaster.c:1748) PostmasterMain(int argc, char ** argv) (src\backend\postmaster\postmaster.c:1420) main(int argc, char ** argv) (src\backend\main\main.c:209)

将日志发送到logger进程中。

// Send data to the syslogger using the chunked protocol static void write_pipe_chunks(char *data, int len, int dest) { PipeProtoChunk p; int fd = fileno(stderr); int rc; Assert(len > 0); p.proto.nuls[0] = p.proto.nuls[1] = '\0'; p.proto.pid = MyProcPid; /* write all but the last chunk */ while (len > PIPE_MAX_PAYLOAD) { p.proto.is_last = (dest == LOG_DESTINATION_CSVLOG ? 'F' : 'f'); p.proto.len = PIPE_MAX_PAYLOAD; memcpy(p.proto.data, data, PIPE_MAX_PAYLOAD); rc = write(fd, &p, PIPE_HEADER_SIZE + PIPE_MAX_PAYLOAD); (void) rc; data += PIPE_MAX_PAYLOAD; len -= PIPE_MAX_PAYLOAD; } /* write the last chunk */ p.proto.is_last = (dest == LOG_DESTINATION_CSVLOG ? 'T' : 't'); p.proto.len = len; memcpy(p.proto.data, data, len); rc = write(fd, &p, PIPE_HEADER_SIZE + len); (void) rc; }
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

文章被以下合辑收录

评论