本篇为小伙伴们带来计算引擎系列——XLOG读写机制源码解读的精彩内容。本文分为4部分:1)xlog 记录格式源码分析 2)xlog buffer申请与初始化源码分析 3)xlog读流程源码分析 4)xlog插入流程源码分析。
He3DB是一款计算/存储分离的云原生数据库,He3DB通过计算/存储分离、数据冷热分层和压缩、智能中间件等技术,来保证高性能和低成本完美兼得,在获得高性能的同时,最大化的帮助客户节省数据库使用成本。
xlog 记录格式源码速览

DETAIL DESCRIPTION
- 每条xlog记录可能包含0~N个block data及其配套元数据信息。
- 若xlog记录存在main data且数据长度大于255,则用uint32来保存长度,否则用uint8来保存长度。
- 若xlog记录存在main data,通过XLogRegisterData(...)函数注册到XLogRecDatas链表mainrdata_head中。
- 若xlog记录存在block data,通过XLogRegisterBuffer(...)函数注册到registered_buffer中。
xlog buffer申请与初始化源码速览

DETAIL DESCRIPTION
- XLOGShmemSize:负责计算xlog buffer的大小,由数据库参数wal_buffers进行大小管控。
- XLOGShmemInit:负责xlog buffer的初始化。全局变量ControlFile用于缓存confile文件内容。全局变量XLogCtl是xlog buffer的管理结构,包含对buffer索引、自旋锁、buffer空间等的管理。其中LogwrtRqst表示目前xlog 请求的write和flush位置,RedoRecPtr已经完成的xlog write和flush的位置。
xlog读流程源码解读

DETAIL DESCRIPTION
- XLogBeginRead:设置xlog读的起始点RecPtr,RecPtr可以是一条有效xlog的开始位置,也可以一个page的开始位置,此处只是设置位置,不会引发错误,但是如果RecPtr无效,在后续xlog读的过程中会引发错误。
- ReadRecord:postgresql中维护全局变量state用于协调xlog读取,ReadRecord用于实现读取下一条xlog,ReadRecord函数在首次调用之前必须先调用XLogBeginRead()。如果已经没有xlog可获取,返回NULL。如果处于备机状态,则会一直重试,直到获取有效xlog。
- XLogReadRecord:读取一条xlog,即state->EndRecPtr。一条xlog可能会跨page,所以争对这种情况需要进行xlog数据重新组装。
- ReadPageInternal:读取目标xlog page。
- XLogPageRead:调用文件接口读取目标xlog page。
xlog插入流程源码解读

XLOG插入过程整体可分为3个部分1)xlog数据注册 2)xlog记录组装 3)xlog记录插入xlog buffer,其中xlog记录组装比较复杂,由函数XLogRecordAssemble负责实现。具体详述如下:

函数XLogRecordAssemble分别组装xlog record头部、blockData 头部、mainData头部、blockData、mainData,代码实现如下:
static XLogRecData *
XLogRecordAssemble(RmgrId rmid, uint8 info,
XLogRecPtr RedoRecPtr, bool doPageWrites,
XLogRecPtr *fpw_lsn, int *num_fpi)
{
XLogRecData *rdt;
//数据链表的总长度
uint32 total_len = 0;
int block_id;
pg_crc32c rdata_crc;
registered_buffer *prev_regbuf = NULL;
XLogRecData *rdt_datas_last;
XLogRecord *rechdr;
char *scratch = hdr_scratch;
/*
* Note: this function can be called multiple times for the same record.
* All the modifications we do to the rdata chains below must handle that.
*/
/* The record begins with the fixed-size header */
//rechdr就是xlog头部,即XlogRecord Header,记录一条xlog记录的元数据
rechdr = (XLogRecord *) scratch;
//XlogRecord Header紧跟着就是一些buffer data、full page、compress、main data的元数据信息
scratch += SizeOfXLogRecord;
hdr_rdt.next = NULL;
rdt_datas_last = &hdr_rdt;
hdr_rdt.data = hdr_scratch;
/*
* Enforce consistency checks for this record if user is looking for it.
* Do this before at the beginning of this routine to give the possibility
* for callers of XLogInsert() to pass XLR_CHECK_CONSISTENCY directly for
* a record.
*/
if (wal_consistency_checking[rmid])
info |= XLR_CHECK_CONSISTENCY;
/*
* Make an rdata chain containing all the data portions of all block
* references. This includes the data for full-page images. Also append
* the headers for the block references in the scratch buffer.
*/
*fpw_lsn = InvalidXLogRecPtr;
//与前文xlog记录格式相对应,对注册的每一个buffer,构造对应的头部数据,
//并将对应bufferData或full page加入链表
for (block_id = 0; block_id < max_registered_block_id; block_id++)
{
registered_buffer *regbuf = ®istered_buffers[block_id];
bool needs_backup;
bool needs_data;
XLogRecordBlockHeader bkpb;
XLogRecordBlockImageHeader bimg;
XLogRecordBlockCompressHeader cbimg = {0};
bool samerel;
bool is_compressed = false;
bool include_image;
if (!regbuf->in_use)
continue;
/* Determine if this block needs to be backed up */
//这面用于判断是否需要备份全页
if (regbuf->flags & REGBUF_FORCE_IMAGE)
needs_backup = true;
else if (regbuf->flags & REGBUF_NO_IMAGE)
needs_backup = false;
else if (!doPageWrites)
needs_backup = false;
else
{
/*
* We assume page LSN is first data on *every* page that can be
* passed to XLogInsert, whether it has the standard page layout
* or not.
*/
XLogRecPtr page_lsn = PageGetLSN(regbuf->page);
needs_backup = (page_lsn <= RedoRecPtr);
if (!needs_backup)
{
if (*fpw_lsn == InvalidXLogRecPtr || page_lsn < *fpw_lsn)
*fpw_lsn = page_lsn;
}
}
/* Determine if the buffer data needs to included */
//这面用于判断注册的buffer是否有bufferData
if (regbuf->rdata_len == 0)
needs_data = false;
else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0)
needs_data = true;
else
needs_data = !needs_backup;
//设置本buffer的XLogRecordBlockHeader
bkpb.id = block_id;
bkpb.fork_flags = regbuf->forkno;
bkpb.data_length = 0;
if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
bkpb.fork_flags |= BKPBLOCK_WILL_INIT;
/*
* If needs_backup is true or WAL checking is enabled for current
* resource manager, log a full-page write for the current block.
*/
//如果需要备份全页或者当前日志类型已启用WAL检查,则设置需要做full page write
include_image = needs_backup || (info & XLR_CHECK_CONSISTENCY) != 0;
//full page write
if (include_image)
{
Page page = regbuf->page;
uint16 compressed_len = 0;
/*
* The page needs to be backed up, so calculate its hole length
* and offset.
*/
//标记出page中空洞offset和大小
if (regbuf->flags & REGBUF_STANDARD)
{
/* Assume we can omit data between pd_lower and pd_upper */
uint16 lower = ((PageHeader) page)->pd_lower;
uint16 upper = ((PageHeader) page)->pd_upper;
if (lower >= SizeOfPageHeaderData &&
upper > lower &&
upper <= BLCKSZ)
{
bimg.hole_offset = lower;
cbimg.hole_length = upper - lower;
}
else
{
/* No "hole" to remove */
bimg.hole_offset = 0;
cbimg.hole_length = 0;
}
}
else
{
/* Not a standard page header, don't try to eliminate "hole" */
bimg.hole_offset = 0;
cbimg.hole_length = 0;
}
/*
* Try to compress a block image if wal_compression is enabled
*/
//进行page数据压缩
if (wal_compression)
{
is_compressed =
XLogCompressBackupBlock(page, bimg.hole_offset,
cbimg.hole_length,
regbuf->compressed_page,
&compressed_len);
}
/*
* Fill in the remaining fields in the XLogRecordBlockHeader
* struct
*/
bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
/* Report a full page image constructed for the WAL record */
*num_fpi += 1;
/*
* Construct XLogRecData entries for the page content.
*/
rdt_datas_last->next = ®buf->bkp_rdatas[0];
rdt_datas_last = rdt_datas_last->next;
bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
/*
* If WAL consistency checking is enabled for the resource manager
* of this WAL record, a full-page image is included in the record
* for the block modified. During redo, the full-page is replayed
* only if BKPIMAGE_APPLY is set.
*/
if (needs_backup)
bimg.bimg_info |= BKPIMAGE_APPLY;
//将bufferData加入链表
if (is_compressed)
{
bimg.length = compressed_len;
bimg.bimg_info |= BKPIMAGE_IS_COMPRESSED;
rdt_datas_last->data = regbuf->compressed_page;
rdt_datas_last->len = compressed_len;
}
else
{
bimg.length = BLCKSZ - cbimg.hole_length;
if (cbimg.hole_length == 0)
{
//如果空洞长度为0则记录一个完整页
rdt_datas_last->data = page;
rdt_datas_last->len = BLCKSZ;
}
else
{
/* must skip the hole */
//如果有空洞,则必须跳过空洞再加入链表
rdt_datas_last->data = page;
rdt_datas_last->len = bimg.hole_offset;
rdt_datas_last->next = ®buf->bkp_rdatas[1];
rdt_datas_last = rdt_datas_last->next;
rdt_datas_last->data =
page + (bimg.hole_offset + cbimg.hole_length);
rdt_datas_last->len =
BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
}
}
total_len += bimg.length;
}
//将bufferData加入链表
if (needs_data)
{
/*
* Link the caller-supplied rdata chain for this buffer to the
* overall list.
*/
bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
bkpb.data_length = regbuf->rdata_len;
total_len += regbuf->rdata_len;
rdt_datas_last->next = regbuf->rdata_head;
rdt_datas_last = regbuf->rdata_tail;
}
if (prev_regbuf && RelFileNodeEquals(regbuf->rnode, prev_regbuf->rnode))
{
samerel = true;
bkpb.fork_flags |= BKPBLOCK_SAME_REL;
}
else
samerel = false;
prev_regbuf = regbuf;
/* Ok, copy the header to the scratch buffer */
//初始化本注册buffer的XLogRecordBlockHeader
memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
scratch += SizeOfXLogRecordBlockHeader;
//初始化本注册buffer的XLogRecordBlockImageHeader
if (include_image)
{
memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
scratch += SizeOfXLogRecordBlockImageHeader;
//初始化本注册buffer的XLogRecordBlockCompressHeader
if (cbimg.hole_length != 0 && is_compressed)
{
memcpy(scratch, &cbimg,
SizeOfXLogRecordBlockCompressHeader);
scratch += SizeOfXLogRecordBlockCompressHeader;
}
}
//初始化本注册buffer的RelFileNode
if (!samerel)
{
memcpy(scratch, ®buf->rnode, sizeof(RelFileNode));
scratch += sizeof(RelFileNode);
}
//初始化本注册buffer的BlockNumber
memcpy(scratch, ®buf->block, sizeof(BlockNumber));
scratch += sizeof(BlockNumber);
}
/* followed by the record's origin, if any */
if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
replorigin_session_origin != InvalidRepOriginId)
{
*(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
scratch += sizeof(replorigin_session_origin);
}
/* followed by toplevel XID, if not already included in previous record */
if (IsSubTransactionAssignmentPending())
{
TransactionId xid = GetTopTransactionIdIfAny();
/* update the flag (later used by XLogResetInsertion) */
XLogSetRecordFlags(XLOG_INCLUDE_XID);
*(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID;
memcpy(scratch, &xid, sizeof(TransactionId));
scratch += sizeof(TransactionId);
}
/* followed by main data, if any */
//将注册的mainData加入链表
if (mainrdata_len > 0)
{
if (mainrdata_len > 255)
{
*(scratch++) = (char) XLR_BLOCK_ID_DATA_LONG;
memcpy(scratch, &mainrdata_len, sizeof(uint32));
scratch += sizeof(uint32);
}
else
{
*(scratch++) = (char) XLR_BLOCK_ID_DATA_SHORT;
*(scratch++) = (uint8) mainrdata_len;
}
rdt_datas_last->next = mainrdata_head;
rdt_datas_last = mainrdata_last;
total_len += mainrdata_len;
}
rdt_datas_last->next = NULL;
hdr_rdt.len = (scratch - hdr_scratch);
total_len += hdr_rdt.len;
/*
* Calculate CRC of the data
*
* Note that the record header isn't added into the CRC initially since we
* don't know the prev-link yet. Thus, the CRC will represent the CRC of
* the whole record in the order: rdata, then backup blocks, then record
* header.
*/
//构造校验码
INIT_CRC32C(rdata_crc);
COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
/*
* Fill in the fields in the record header. Prev-link is filled in later,
* once we know where in the WAL the record will be inserted. The CRC does
* not include the record header yet.
*/
//设置xlog头部元数据,即XlogRecord Header
rechdr->xl_xid = GetCurrentTransactionIdIfAny();
rechdr->xl_tot_len = total_len;
rechdr->xl_info = info;
rechdr->xl_rmid = rmid;
rechdr->xl_prev = InvalidXLogRecPtr;
rechdr->xl_crc = rdata_crc;
return &hdr_rdt;
}XLogInsertRecord函数负责将上面组装好的xlog记录插入xlog buffer中:
XLogRecPtr
XLogInsertRecord(XLogRecData *rdata,
XLogRecPtr fpw_lsn,
uint8 flags,
int num_fpi)
{
//XLogCtl是xlog buffer的管理结构
XLogCtlInsert *Insert = &XLogCtl->Insert;
pg_crc32c rdata_crc;
bool inserted;
//XlogRecord Header
XLogRecord *rechdr = (XLogRecord *) rdata->data;
uint8 info = rechdr->xl_info & ~XLR_INFO_MASK;
//xlog记录是否跨segment
bool isLogSwitch = (rechdr->xl_rmid == RM_XLOG_ID &&
info == XLOG_SWITCH);
XLogRecPtr StartPos;
XLogRecPtr EndPos;
bool prevDoPageWrites = doPageWrites;
/* we assume that all of the record header is in the first chunk */
Assert(rdata->len >= SizeOfXLogRecord);
/* cross-check on whether we should be here or not */
if (!XLogInsertAllowed())
elog(ERROR, "cannot make new WAL entries during recovery");
//获取xlog buffer插入锁,一共有8把锁,也就是说同时最多只能有8个并发进行xlog buffer插入
START_CRIT_SECTION();
if (isLogSwitch)
WALInsertLockAcquireExclusive();
else
WALInsertLockAcquire();
if (RedoRecPtr != Insert->RedoRecPtr)
{
Assert(RedoRecPtr < Insert->RedoRecPtr);
RedoRecPtr = Insert->RedoRecPtr;
}
doPageWrites = (Insert->fullPageWrites || Insert->forcePageWrites);
if (doPageWrites &&
(!prevDoPageWrites ||
(fpw_lsn != InvalidXLogRecPtr && fpw_lsn <= RedoRecPtr)))
{
/*
* Oops, some buffer now needs to be backed up that the caller didn't
* back up. Start over.
*/
WALInsertLockRelease();
END_CRIT_SECTION();
return InvalidXLogRecPtr;
}
/*
* Reserve space for the record in the WAL. This also sets the xl_prev
* pointer.
*/
if (isLogSwitch)
inserted = ReserveXLogSwitch(&StartPos, &EndPos, &rechdr->xl_prev);
else
{
//从xlog buffer中预留插入位置,由于xlog buffer是一整块连续内存空间,不存在跨segment、跨page概念,所以ReserveXLogInsertLocation中会存在两种地址转换
ReserveXLogInsertLocation(rechdr->xl_tot_len, &StartPos, &EndPos,
&rechdr->xl_prev);
inserted = true;
}
if (inserted)
{
/*
* Now that xl_prev has been filled in, calculate CRC of the record
* header.
*/
//校验码相关
rdata_crc = rechdr->xl_crc;
COMP_CRC32C(rdata_crc, rechdr, offsetof(XLogRecord, xl_crc));
FIN_CRC32C(rdata_crc);
rechdr->xl_crc = rdata_crc;
/*
* All the record data, including the header, is now ready to be
* inserted. Copy the record in the space reserved.
*/
//将xlog记录数据拷贝到xlog buffer中
CopyXLogRecordToWAL(rechdr->xl_tot_len, isLogSwitch, rdata,
StartPos, EndPos);
/*
* Unless record is flagged as not important, update LSN of last
* important record in the current slot. When holding all locks, just
* update the first one.
*/
if ((flags & XLOG_MARK_UNIMPORTANT) == 0)
{
int lockno = holdingAllLocks ? 0 : MyLockNo;
WALInsertLocks[lockno].l.lastImportantAt = StartPos;
}
}
else
{
/*
* This was an xlog-switch record, but the current insert location was
* already exactly at the beginning of a segment, so there was no need
* to do anything.
*/
}
/*
* Done! Let others know that we're finished.
*/
WALInsertLockRelease();
MarkCurrentTransactionIdLoggedIfAny();
END_CRIT_SECTION();
/*
* Update shared LogwrtRqst.Write, if we crossed page boundary.
*/
//更新全局变量LogwrtRqst.Write,表示请求xlog write的位置
if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
{
SpinLockAcquire(&XLogCtl->info_lck);
/* advance global request to include new block(s) */
if (XLogCtl->LogwrtRqst.Write < EndPos)
XLogCtl->LogwrtRqst.Write = EndPos;
/* update local result copy while I have the chance */
LogwrtResult = XLogCtl->LogwrtResult;
SpinLockRelease(&XLogCtl->info_lck);
}
/*
* If this was an XLOG_SWITCH record, flush the record and the empty
* padding space that fills the rest of the segment, and perform
* end-of-segment actions (eg, notifying archiver).
*/
if (isLogSwitch)
{
TRACE_POSTGRESQL_WAL_SWITCH();
//强制日志flush
XLogFlush(EndPos);
/*
* Even though we reserved the rest of the segment for us, which is
* reflected in EndPos, we return a pointer to just the end of the
* xlog-switch record.
*/
if (inserted)
{
EndPos = StartPos + SizeOfXLogRecord;
if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
{
uint64 offset = XLogSegmentOffset(EndPos, wal_segment_size);
if (offset == EndPos % XLOG_BLCKSZ)
EndPos += SizeOfXLogLongPHD;
else
EndPos += SizeOfXLogShortPHD;
}
}
}
/*
* Update our global variables
*/
ProcLastRecPtr = StartPos;
XactLastRecEnd = EndPos;
/* Report WAL traffic to the instrumentation. */
//统计数据相关
if (inserted)
{
pgWalUsage.wal_bytes += rechdr->xl_tot_len;
pgWalUsage.wal_records++;
pgWalUsage.wal_fpi += num_fpi;
}
return EndPos;
}通过以上步骤,就完成了xlog记录的注册、组装、插入流程!本篇为小伙伴们带来计算引擎系列——XLOG读写机制源码解读的精彩内容。本文分为4部分:1)xlog 记录格式源码分析 2)xlog buffer申请与初始化源码分析 3)xlog读流程源码分析 4)xlog插入流程源码分析。
He3DB是一款计算/存储分离的云原生数据库,He3DB通过计算/存储分离、数据冷热分层和压缩、智能中间件等技术,来保证高性能和低成本完美兼得,在获得高性能的同时,最大化的帮助客户节省数据库使用成本。
xlog 记录格式源码速览
DETAIL DESCRIPTION
- 每条xlog记录可能包含0~N个block data及其配套元数据信息。
- 若xlog记录存在main data且数据长度大于255,则用uint32来保存长度,否则用uint8来保存长度。
- 若xlog记录存在main data,通过XLogRegisterData(...)函数注册到XLogRecDatas链表mainrdata_head中。
- 若xlog记录存在block data,通过XLogRegisterBuffer(...)函数注册到registered_buffer中。
xlog buffer申请与初始化源码速览
DETAIL DESCRIPTION
- XLOGShmemSize:负责计算xlog buffer的大小,由数据库参数wal_buffers进行大小管控。
- XLOGShmemInit:负责xlog buffer的初始化。全局变量ControlFile用于缓存confile文件内容。全局变量XLogCtl是xlog buffer的管理结构,包含对buffer索引、自旋锁、buffer空间等的管理。其中LogwrtRqst表示目前xlog 请求的write和flush位置,RedoRecPtr已经完成的xlog write和flush的位置。
xlog读流程源码解读
DETAIL DESCRIPTION
- XLogBeginRead:设置xlog读的起始点RecPtr,RecPtr可以是一条有效xlog的开始位置,也可以一个page的开始位置,此处只是设置位置,不会引发错误,但是如果RecPtr无效,在后续xlog读的过程中会引发错误。
- ReadRecord:postgresql中维护全局变量state用于协调xlog读取,ReadRecord用于实现读取下一条xlog,ReadRecord函数在首次调用之前必须先调用XLogBeginRead()。如果已经没有xlog可获取,返回NULL。如果处于备机状态,则会一直重试,直到获取有效xlog。
- XLogReadRecord:读取一条xlog,即state->EndRecPtr。一条xlog可能会跨page,所以争对这种情况需要进行xlog数据重新组装。
- ReadPageInternal:读取目标xlog page。
- XLogPageRead:调用文件接口读取目标xlog page。
xlog插入流程源码解读
XLOG插入过程整体可分为3个部分1)xlog数据注册 2)xlog记录组装 3)xlog记录插入xlog buffer,其中xlog记录组装比较复杂,由函数XLogRecordAssemble负责实现。具体详述如下:
函数XLogRecordAssemble分别组装xlog record头部、blockData 头部、mainData头部、blockData、mainData,代码实现如下:
static XLogRecData *
XLogRecordAssemble(RmgrId rmid, uint8 info,
XLogRecPtr RedoRecPtr, bool doPageWrites,
XLogRecPtr *fpw_lsn, int *num_fpi)
{
XLogRecData *rdt;
//数据链表的总长度
uint32 total_len = 0;
int block_id;
pg_crc32c rdata_crc;
registered_buffer *prev_regbuf = NULL;
XLogRecData *rdt_datas_last;
XLogRecord *rechdr;
char *scratch = hdr_scratch;
/*
* Note: this function can be called multiple times for the same record.
* All the modifications we do to the rdata chains below must handle that.
*/
/* The record begins with the fixed-size header */
//rechdr就是xlog头部,即XlogRecord Header,记录一条xlog记录的元数据
rechdr = (XLogRecord *) scratch;
//XlogRecord Header紧跟着就是一些buffer data、full page、compress、main data的元数据信息
scratch += SizeOfXLogRecord;
hdr_rdt.next = NULL;
rdt_datas_last = &hdr_rdt;
hdr_rdt.data = hdr_scratch;
/*
* Enforce consistency checks for this record if user is looking for it.
* Do this before at the beginning of this routine to give the possibility
* for callers of XLogInsert() to pass XLR_CHECK_CONSISTENCY directly for
* a record.
*/
if (wal_consistency_checking[rmid])
info |= XLR_CHECK_CONSISTENCY;
/*
* Make an rdata chain containing all the data portions of all block
* references. This includes the data for full-page images. Also append
* the headers for the block references in the scratch buffer.
*/
*fpw_lsn = InvalidXLogRecPtr;
//与前文xlog记录格式相对应,对注册的每一个buffer,构造对应的头部数据,
//并将对应bufferData或full page加入链表
for (block_id = 0; block_id < max_registered_block_id; block_id++)
{
registered_buffer *regbuf = ®istered_buffers[block_id];
bool needs_backup;
bool needs_data;
XLogRecordBlockHeader bkpb;
XLogRecordBlockImageHeader bimg;
XLogRecordBlockCompressHeader cbimg = {0};
bool samerel;
bool is_compressed = false;
bool include_image;
if (!regbuf->in_use)
continue;
/* Determine if this block needs to be backed up */
//这面用于判断是否需要备份全页
if (regbuf->flags & REGBUF_FORCE_IMAGE)
needs_backup = true;
else if (regbuf->flags & REGBUF_NO_IMAGE)
needs_backup = false;
else if (!doPageWrites)
needs_backup = false;
else
{
/*
* We assume page LSN is first data on *every* page that can be
* passed to XLogInsert, whether it has the standard page layout
* or not.
*/
XLogRecPtr page_lsn = PageGetLSN(regbuf->page);
needs_backup = (page_lsn <= RedoRecPtr);
if (!needs_backup)
{
if (*fpw_lsn == InvalidXLogRecPtr || page_lsn < *fpw_lsn)
*fpw_lsn = page_lsn;
}
}
/* Determine if the buffer data needs to included */
//这面用于判断注册的buffer是否有bufferData
if (regbuf->rdata_len == 0)
needs_data = false;
else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0)
needs_data = true;
else
needs_data = !needs_backup;
//设置本buffer的XLogRecordBlockHeader
bkpb.id = block_id;
bkpb.fork_flags = regbuf->forkno;
bkpb.data_length = 0;
if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
bkpb.fork_flags |= BKPBLOCK_WILL_INIT;
/*
* If needs_backup is true or WAL checking is enabled for current
* resource manager, log a full-page write for the current block.
*/
//如果需要备份全页或者当前日志类型已启用WAL检查,则设置需要做full page write
include_image = needs_backup || (info & XLR_CHECK_CONSISTENCY) != 0;
//full page write
if (include_image)
{
Page page = regbuf->page;
uint16 compressed_len = 0;
/*
* The page needs to be backed up, so calculate its hole length
* and offset.
*/
//标记出page中空洞offset和大小
if (regbuf->flags & REGBUF_STANDARD)
{
/* Assume we can omit data between pd_lower and pd_upper */
uint16 lower = ((PageHeader) page)->pd_lower;
uint16 upper = ((PageHeader) page)->pd_upper;
if (lower >= SizeOfPageHeaderData &&
upper > lower &&
upper <= BLCKSZ)
{
bimg.hole_offset = lower;
cbimg.hole_length = upper - lower;
}
else
{
/* No "hole" to remove */
bimg.hole_offset = 0;
cbimg.hole_length = 0;
}
}
else
{
/* Not a standard page header, don't try to eliminate "hole" */
bimg.hole_offset = 0;
cbimg.hole_length = 0;
}
/*
* Try to compress a block image if wal_compression is enabled
*/
//进行page数据压缩
if (wal_compression)
{
is_compressed =
XLogCompressBackupBlock(page, bimg.hole_offset,
cbimg.hole_length,
regbuf->compressed_page,
&compressed_len);
}
/*
* Fill in the remaining fields in the XLogRecordBlockHeader
* struct
*/
bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
/* Report a full page image constructed for the WAL record */
*num_fpi += 1;
/*
* Construct XLogRecData entries for the page content.
*/
rdt_datas_last->next = ®buf->bkp_rdatas[0];
rdt_datas_last = rdt_datas_last->next;
bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
/*
* If WAL consistency checking is enabled for the resource manager
* of this WAL record, a full-page image is included in the record
* for the block modified. During redo, the full-page is replayed
* only if BKPIMAGE_APPLY is set.
*/
if (needs_backup)
bimg.bimg_info |= BKPIMAGE_APPLY;
//将bufferData加入链表
if (is_compressed)
{
bimg.length = compressed_len;
bimg.bimg_info |= BKPIMAGE_IS_COMPRESSED;
rdt_datas_last->data = regbuf->compressed_page;
rdt_datas_last->len = compressed_len;
}
else
{
bimg.length = BLCKSZ - cbimg.hole_length;
if (cbimg.hole_length == 0)
{
//如果空洞长度为0则记录一个完整页
rdt_datas_last->data = page;
rdt_datas_last->len = BLCKSZ;
}
else
{
/* must skip the hole */
//如果有空洞,则必须跳过空洞再加入链表
rdt_datas_last->data = page;
rdt_datas_last->len = bimg.hole_offset;
rdt_datas_last->next = ®buf->bkp_rdatas[1];
rdt_datas_last = rdt_datas_last->next;
rdt_datas_last->data =
page + (bimg.hole_offset + cbimg.hole_length);
rdt_datas_last->len =
BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
}
}
total_len += bimg.length;
}
//将bufferData加入链表
if (needs_data)
{
/*
* Link the caller-supplied rdata chain for this buffer to the
* overall list.
*/
bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
bkpb.data_length = regbuf->rdata_len;
total_len += regbuf->rdata_len;
rdt_datas_last->next = regbuf->rdata_head;
rdt_datas_last = regbuf->rdata_tail;
}
if (prev_regbuf && RelFileNodeEquals(regbuf->rnode, prev_regbuf->rnode))
{
samerel = true;
bkpb.fork_flags |= BKPBLOCK_SAME_REL;
}
else
samerel = false;
prev_regbuf = regbuf;
/* Ok, copy the header to the scratch buffer */
//初始化本注册buffer的XLogRecordBlockHeader
memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
scratch += SizeOfXLogRecordBlockHeader;
//初始化本注册buffer的XLogRecordBlockImageHeader
if (include_image)
{
memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
scratch += SizeOfXLogRecordBlockImageHeader;
//初始化本注册buffer的XLogRecordBlockCompressHeader
if (cbimg.hole_length != 0 && is_compressed)
{
memcpy(scratch, &cbimg,
SizeOfXLogRecordBlockCompressHeader);
scratch += SizeOfXLogRecordBlockCompressHeader;
}
}
//初始化本注册buffer的RelFileNode
if (!samerel)
{
memcpy(scratch, ®buf->rnode, sizeof(RelFileNode));
scratch += sizeof(RelFileNode);
}
//初始化本注册buffer的BlockNumber
memcpy(scratch, ®buf->block, sizeof(BlockNumber));
scratch += sizeof(BlockNumber);
}
/* followed by the record's origin, if any */
if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
replorigin_session_origin != InvalidRepOriginId)
{
*(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
scratch += sizeof(replorigin_session_origin);
}
/* followed by toplevel XID, if not already included in previous record */
if (IsSubTransactionAssignmentPending())
{
TransactionId xid = GetTopTransactionIdIfAny();
/* update the flag (later used by XLogResetInsertion) */
XLogSetRecordFlags(XLOG_INCLUDE_XID);
*(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID;
memcpy(scratch, &xid, sizeof(TransactionId));
scratch += sizeof(TransactionId);
}
/* followed by main data, if any */
//将注册的mainData加入链表
if (mainrdata_len > 0)
{
if (mainrdata_len > 255)
{
*(scratch++) = (char) XLR_BLOCK_ID_DATA_LONG;
memcpy(scratch, &mainrdata_len, sizeof(uint32));
scratch += sizeof(uint32);
}
else
{
*(scratch++) = (char) XLR_BLOCK_ID_DATA_SHORT;
*(scratch++) = (uint8) mainrdata_len;
}
rdt_datas_last->next = mainrdata_head;
rdt_datas_last = mainrdata_last;
total_len += mainrdata_len;
}
rdt_datas_last->next = NULL;
hdr_rdt.len = (scratch - hdr_scratch);
total_len += hdr_rdt.len;
/*
* Calculate CRC of the data
*
* Note that the record header isn't added into the CRC initially since we
* don't know the prev-link yet. Thus, the CRC will represent the CRC of
* the whole record in the order: rdata, then backup blocks, then record
* header.
*/
//构造校验码
INIT_CRC32C(rdata_crc);
COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
/*
* Fill in the fields in the record header. Prev-link is filled in later,
* once we know where in the WAL the record will be inserted. The CRC does
* not include the record header yet.
*/
//设置xlog头部元数据,即XlogRecord Header
rechdr->xl_xid = GetCurrentTransactionIdIfAny();
rechdr->xl_tot_len = total_len;
rechdr->xl_info = info;
rechdr->xl_rmid = rmid;
rechdr->xl_prev = InvalidXLogRecPtr;
rechdr->xl_crc = rdata_crc;
return &hdr_rdt;
}XLogInsertRecord函数负责将上面组装好的xlog记录插入xlog buffer中:
XLogRecPtr
XLogInsertRecord(XLogRecData *rdata,
XLogRecPtr fpw_lsn,
uint8 flags,
int num_fpi)
{
//XLogCtl是xlog buffer的管理结构
XLogCtlInsert *Insert = &XLogCtl->Insert;
pg_crc32c rdata_crc;
bool inserted;
//XlogRecord Header
XLogRecord *rechdr = (XLogRecord *) rdata->data;
uint8 info = rechdr->xl_info & ~XLR_INFO_MASK;
//xlog记录是否跨segment
bool isLogSwitch = (rechdr->xl_rmid == RM_XLOG_ID &&
info == XLOG_SWITCH);
XLogRecPtr StartPos;
XLogRecPtr EndPos;
bool prevDoPageWrites = doPageWrites;
/* we assume that all of the record header is in the first chunk */
Assert(rdata->len >= SizeOfXLogRecord);
/* cross-check on whether we should be here or not */
if (!XLogInsertAllowed())
elog(ERROR, "cannot make new WAL entries during recovery");
//获取xlog buffer插入锁,一共有8把锁,也就是说同时最多只能有8个并发进行xlog buffer插入
START_CRIT_SECTION();
if (isLogSwitch)
WALInsertLockAcquireExclusive();
else
WALInsertLockAcquire();
if (RedoRecPtr != Insert->RedoRecPtr)
{
Assert(RedoRecPtr < Insert->RedoRecPtr);
RedoRecPtr = Insert->RedoRecPtr;
}
doPageWrites = (Insert->fullPageWrites || Insert->forcePageWrites);
if (doPageWrites &&
(!prevDoPageWrites ||
(fpw_lsn != InvalidXLogRecPtr && fpw_lsn <= RedoRecPtr)))
{
/*
* Oops, some buffer now needs to be backed up that the caller didn't
* back up. Start over.
*/
WALInsertLockRelease();
END_CRIT_SECTION();
return InvalidXLogRecPtr;
}
/*
* Reserve space for the record in the WAL. This also sets the xl_prev
* pointer.
*/
if (isLogSwitch)
inserted = ReserveXLogSwitch(&StartPos, &EndPos, &rechdr->xl_prev);
else
{
//从xlog buffer中预留插入位置,由于xlog buffer是一整块连续内存空间,不存在跨segment、跨page概念,所以ReserveXLogInsertLocation中会存在两种地址转换
ReserveXLogInsertLocation(rechdr->xl_tot_len, &StartPos, &EndPos,
&rechdr->xl_prev);
inserted = true;
}
if (inserted)
{
/*
* Now that xl_prev has been filled in, calculate CRC of the record
* header.
*/
//校验码相关
rdata_crc = rechdr->xl_crc;
COMP_CRC32C(rdata_crc, rechdr, offsetof(XLogRecord, xl_crc));
FIN_CRC32C(rdata_crc);
rechdr->xl_crc = rdata_crc;
/*
* All the record data, including the header, is now ready to be
* inserted. Copy the record in the space reserved.
*/
//将xlog记录数据拷贝到xlog buffer中
CopyXLogRecordToWAL(rechdr->xl_tot_len, isLogSwitch, rdata,
StartPos, EndPos);
/*
* Unless record is flagged as not important, update LSN of last
* important record in the current slot. When holding all locks, just
* update the first one.
*/
if ((flags & XLOG_MARK_UNIMPORTANT) == 0)
{
int lockno = holdingAllLocks ? 0 : MyLockNo;
WALInsertLocks[lockno].l.lastImportantAt = StartPos;
}
}
else
{
/*
* This was an xlog-switch record, but the current insert location was
* already exactly at the beginning of a segment, so there was no need
* to do anything.
*/
}
/*
* Done! Let others know that we're finished.
*/
WALInsertLockRelease();
MarkCurrentTransactionIdLoggedIfAny();
END_CRIT_SECTION();
/*
* Update shared LogwrtRqst.Write, if we crossed page boundary.
*/
//更新全局变量LogwrtRqst.Write,表示请求xlog write的位置
if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
{
SpinLockAcquire(&XLogCtl->info_lck);
/* advance global request to include new block(s) */
if (XLogCtl->LogwrtRqst.Write < EndPos)
XLogCtl->LogwrtRqst.Write = EndPos;
/* update local result copy while I have the chance */
LogwrtResult = XLogCtl->LogwrtResult;
SpinLockRelease(&XLogCtl->info_lck);
}
/*
* If this was an XLOG_SWITCH record, flush the record and the empty
* padding space that fills the rest of the segment, and perform
* end-of-segment actions (eg, notifying archiver).
*/
if (isLogSwitch)
{
TRACE_POSTGRESQL_WAL_SWITCH();
//强制日志flush
XLogFlush(EndPos);
/*
* Even though we reserved the rest of the segment for us, which is
* reflected in EndPos, we return a pointer to just the end of the
* xlog-switch record.
*/
if (inserted)
{
EndPos = StartPos + SizeOfXLogRecord;
if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
{
uint64 offset = XLogSegmentOffset(EndPos, wal_segment_size);
if (offset == EndPos % XLOG_BLCKSZ)
EndPos += SizeOfXLogLongPHD;
else
EndPos += SizeOfXLogShortPHD;
}
}
}
/*
* Update our global variables
*/
ProcLastRecPtr = StartPos;
XactLastRecEnd = EndPos;
/* Report WAL traffic to the instrumentation. */
//统计数据相关
if (inserted)
{
pgWalUsage.wal_bytes += rechdr->xl_tot_len;
pgWalUsage.wal_records++;
pgWalUsage.wal_fpi += num_fpi;
}
return EndPos;
}通过以上步骤,就完成了xlog记录的注册、组装、插入流程!




