暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

He3DB 02 XLOG读写机制源码解读

原创 移动云He3DB 2023-07-06
347

本篇为小伙伴们带来计算引擎系列——XLOG读写机制源码解读的精彩内容。本文分为4部分:1)xlog 记录格式源码分析 2)xlog buffer申请与初始化源码分析 3)xlog读流程源码分析 4)xlog插入流程源码分析。

He3DB是一款计算/存储分离的云原生数据库,He3DB通过计算/存储分离、数据冷热分层和压缩、智能中间件等技术,来保证高性能和低成本完美兼得,在获得高性能的同时,最大化的帮助客户节省数据库使用成本。

xlog 记录格式源码速览

xlog 记录格式

DETAIL DESCRIPTION

  1. 每条xlog记录可能包含0~N个block data及其配套元数据信息。
  2. 若xlog记录存在main data且数据长度大于255,则用uint32来保存长度,否则用uint8来保存长度。
  3. 若xlog记录存在main data,通过XLogRegisterData(...)函数注册到XLogRecDatas链表mainrdata_head中。
  4. 若xlog记录存在block data,通过XLogRegisterBuffer(...)函数注册到registered_buffer中。

xlog buffer申请与初始化源码速览

xlog buffer申请与初始化

DETAIL DESCRIPTION

  1. XLOGShmemSize:负责计算xlog buffer的大小,由数据库参数wal_buffers进行大小管控。
  2. XLOGShmemInit:负责xlog buffer的初始化。全局变量ControlFile用于缓存confile文件内容。全局变量XLogCtl是xlog buffer的管理结构,包含对buffer索引、自旋锁、buffer空间等的管理。其中LogwrtRqst表示目前xlog 请求的write和flush位置,RedoRecPtr已经完成的xlog write和flush的位置。

xlog读流程源码解读

XLOG读

DETAIL DESCRIPTION

  1. XLogBeginRead:设置xlog读的起始点RecPtr,RecPtr可以是一条有效xlog的开始位置,也可以一个page的开始位置,此处只是设置位置,不会引发错误,但是如果RecPtr无效,在后续xlog读的过程中会引发错误。
  2. ReadRecord:postgresql中维护全局变量state用于协调xlog读取,ReadRecord用于实现读取下一条xlog,ReadRecord函数在首次调用之前必须先调用XLogBeginRead()。如果已经没有xlog可获取,返回NULL。如果处于备机状态,则会一直重试,直到获取有效xlog。
  3. XLogReadRecord:读取一条xlog,即state->EndRecPtr。一条xlog可能会跨page,所以争对这种情况需要进行xlog数据重新组装。
  4. ReadPageInternal:读取目标xlog page。
  5. XLogPageRead:调用文件接口读取目标xlog page。

xlog插入流程源码解读

xlog插入源码速览

XLOG插入过程整体可分为3个部分1)xlog数据注册 2)xlog记录组装 3)xlog记录插入xlog buffer,其中xlog记录组装比较复杂,由函数XLogRecordAssemble负责实现。具体详述如下:

xlog记录组装

函数XLogRecordAssemble分别组装xlog record头部、blockData 头部、mainData头部、blockData、mainData,代码实现如下:

static XLogRecData *
XLogRecordAssemble(RmgrId rmid, uint8 info,
				   XLogRecPtr RedoRecPtr, bool doPageWrites,
				   XLogRecPtr *fpw_lsn, int *num_fpi)
{
	XLogRecData *rdt;
	//数据链表的总长度
	uint32		total_len = 0;
	int			block_id;
	pg_crc32c	rdata_crc;
	registered_buffer *prev_regbuf = NULL;
	XLogRecData *rdt_datas_last;
	XLogRecord *rechdr;
	char	   *scratch = hdr_scratch;

	/*
	 * Note: this function can be called multiple times for the same record.
	 * All the modifications we do to the rdata chains below must handle that.
	 */

	/* The record begins with the fixed-size header */
	//rechdr就是xlog头部,即XlogRecord Header,记录一条xlog记录的元数据
	rechdr = (XLogRecord *) scratch;
	//XlogRecord Header紧跟着就是一些buffer data、full page、compress、main data的元数据信息
	scratch += SizeOfXLogRecord;

	hdr_rdt.next = NULL;
	rdt_datas_last = &hdr_rdt;
	hdr_rdt.data = hdr_scratch;

	/*
	 * Enforce consistency checks for this record if user is looking for it.
	 * Do this before at the beginning of this routine to give the possibility
	 * for callers of XLogInsert() to pass XLR_CHECK_CONSISTENCY directly for
	 * a record.
	 */
	if (wal_consistency_checking[rmid])
		info |= XLR_CHECK_CONSISTENCY;

	/*
	 * Make an rdata chain containing all the data portions of all block
	 * references. This includes the data for full-page images. Also append
	 * the headers for the block references in the scratch buffer.
	 */
	*fpw_lsn = InvalidXLogRecPtr;
	//与前文xlog记录格式相对应,对注册的每一个buffer,构造对应的头部数据,
	//并将对应bufferData或full page加入链表
	for (block_id = 0; block_id < max_registered_block_id; block_id++)
	{
		registered_buffer *regbuf = &registered_buffers[block_id];
		bool		needs_backup;
		bool		needs_data;
		XLogRecordBlockHeader bkpb;
		XLogRecordBlockImageHeader bimg;
		XLogRecordBlockCompressHeader cbimg = {0};
		bool		samerel;
		bool		is_compressed = false;
		bool		include_image;

		if (!regbuf->in_use)
			continue;

		/* Determine if this block needs to be backed up */
		//这面用于判断是否需要备份全页
		if (regbuf->flags & REGBUF_FORCE_IMAGE)
			needs_backup = true;
		else if (regbuf->flags & REGBUF_NO_IMAGE)
			needs_backup = false;
		else if (!doPageWrites)
			needs_backup = false;
		else
		{
			/*
			 * We assume page LSN is first data on *every* page that can be
			 * passed to XLogInsert, whether it has the standard page layout
			 * or not.
			 */
			XLogRecPtr	page_lsn = PageGetLSN(regbuf->page);

			needs_backup = (page_lsn <= RedoRecPtr);
			if (!needs_backup)
			{
				if (*fpw_lsn == InvalidXLogRecPtr || page_lsn < *fpw_lsn)
					*fpw_lsn = page_lsn;
			}
		}

		/* Determine if the buffer data needs to included */
		//这面用于判断注册的buffer是否有bufferData
		if (regbuf->rdata_len == 0)
			needs_data = false;
		else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0)
			needs_data = true;
		else
			needs_data = !needs_backup;

		//设置本buffer的XLogRecordBlockHeader
		bkpb.id = block_id;
		bkpb.fork_flags = regbuf->forkno;
		bkpb.data_length = 0;

		if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
			bkpb.fork_flags |= BKPBLOCK_WILL_INIT;

		/*
		 * If needs_backup is true or WAL checking is enabled for current
		 * resource manager, log a full-page write for the current block.
		 */
		 //如果需要备份全页或者当前日志类型已启用WAL检查,则设置需要做full page write
		include_image = needs_backup || (info & XLR_CHECK_CONSISTENCY) != 0;

		//full page write
		if (include_image)
		{
			Page		page = regbuf->page;
			uint16		compressed_len = 0;

			/*
			 * The page needs to be backed up, so calculate its hole length
			 * and offset.
			 */
			 //标记出page中空洞offset和大小
			if (regbuf->flags & REGBUF_STANDARD)
			{
				/* Assume we can omit data between pd_lower and pd_upper */
				uint16		lower = ((PageHeader) page)->pd_lower;
				uint16		upper = ((PageHeader) page)->pd_upper;

				if (lower >= SizeOfPageHeaderData &&
					upper > lower &&
					upper <= BLCKSZ)
				{
					bimg.hole_offset = lower;
					cbimg.hole_length = upper - lower;
				}
				else
				{
					/* No "hole" to remove */
					bimg.hole_offset = 0;
					cbimg.hole_length = 0;
				}
			}
			else
			{
				/* Not a standard page header, don't try to eliminate "hole" */
				bimg.hole_offset = 0;
				cbimg.hole_length = 0;
			}

			/*
			 * Try to compress a block image if wal_compression is enabled
			 */
			 //进行page数据压缩
			if (wal_compression)
			{
				is_compressed =
					XLogCompressBackupBlock(page, bimg.hole_offset,
											cbimg.hole_length,
											regbuf->compressed_page,
											&compressed_len);
			}

			/*
			 * Fill in the remaining fields in the XLogRecordBlockHeader
			 * struct
			 */
			bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;

			/* Report a full page image constructed for the WAL record */
			*num_fpi += 1;

			/*
			 * Construct XLogRecData entries for the page content.
			 */
			rdt_datas_last->next = &regbuf->bkp_rdatas[0];
			rdt_datas_last = rdt_datas_last->next;

			bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;

			/*
			 * If WAL consistency checking is enabled for the resource manager
			 * of this WAL record, a full-page image is included in the record
			 * for the block modified. During redo, the full-page is replayed
			 * only if BKPIMAGE_APPLY is set.
			 */
			if (needs_backup)
				bimg.bimg_info |= BKPIMAGE_APPLY;

			//将bufferData加入链表
			if (is_compressed)
			{
				bimg.length = compressed_len;
				bimg.bimg_info |= BKPIMAGE_IS_COMPRESSED;

				rdt_datas_last->data = regbuf->compressed_page;
				rdt_datas_last->len = compressed_len;
			}
			else
			{
				bimg.length = BLCKSZ - cbimg.hole_length;

				if (cbimg.hole_length == 0)
				{
					//如果空洞长度为0则记录一个完整页
					rdt_datas_last->data = page;
					rdt_datas_last->len = BLCKSZ;
				}
				else
				{
					/* must skip the hole */
					//如果有空洞,则必须跳过空洞再加入链表
					rdt_datas_last->data = page;
					rdt_datas_last->len = bimg.hole_offset;

					rdt_datas_last->next = &regbuf->bkp_rdatas[1];
					rdt_datas_last = rdt_datas_last->next;

					rdt_datas_last->data =
						page + (bimg.hole_offset + cbimg.hole_length);
					rdt_datas_last->len =
						BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
				}
			}

			total_len += bimg.length;
		}

		//将bufferData加入链表
		if (needs_data)
		{
			/*
			 * Link the caller-supplied rdata chain for this buffer to the
			 * overall list.
			 */
			bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
			bkpb.data_length = regbuf->rdata_len;
			total_len += regbuf->rdata_len;

			rdt_datas_last->next = regbuf->rdata_head;
			rdt_datas_last = regbuf->rdata_tail;
		}

		if (prev_regbuf && RelFileNodeEquals(regbuf->rnode, prev_regbuf->rnode))
		{
			samerel = true;
			bkpb.fork_flags |= BKPBLOCK_SAME_REL;
		}
		else
			samerel = false;
		prev_regbuf = regbuf;

		/* Ok, copy the header to the scratch buffer */
		//初始化本注册buffer的XLogRecordBlockHeader
		memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
		scratch += SizeOfXLogRecordBlockHeader;
		//初始化本注册buffer的XLogRecordBlockImageHeader
		if (include_image)
		{
			memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
			scratch += SizeOfXLogRecordBlockImageHeader;
			//初始化本注册buffer的XLogRecordBlockCompressHeader
			if (cbimg.hole_length != 0 && is_compressed)
			{
				memcpy(scratch, &cbimg,
					   SizeOfXLogRecordBlockCompressHeader);
				scratch += SizeOfXLogRecordBlockCompressHeader;
			}
		}
		//初始化本注册buffer的RelFileNode
		if (!samerel)
		{
			memcpy(scratch, &regbuf->rnode, sizeof(RelFileNode));
			scratch += sizeof(RelFileNode);
		}
		//初始化本注册buffer的BlockNumber
		memcpy(scratch, &regbuf->block, sizeof(BlockNumber));
		scratch += sizeof(BlockNumber);
	}

	/* followed by the record's origin, if any */
	if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
		replorigin_session_origin != InvalidRepOriginId)
	{
		*(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
		memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
		scratch += sizeof(replorigin_session_origin);
	}

	/* followed by toplevel XID, if not already included in previous record */
	if (IsSubTransactionAssignmentPending())
	{
		TransactionId xid = GetTopTransactionIdIfAny();

		/* update the flag (later used by XLogResetInsertion) */
		XLogSetRecordFlags(XLOG_INCLUDE_XID);

		*(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID;
		memcpy(scratch, &xid, sizeof(TransactionId));
		scratch += sizeof(TransactionId);
	}

	/* followed by main data, if any */
	//将注册的mainData加入链表
	if (mainrdata_len > 0)
	{
		if (mainrdata_len > 255)
		{
			*(scratch++) = (char) XLR_BLOCK_ID_DATA_LONG;
			memcpy(scratch, &mainrdata_len, sizeof(uint32));
			scratch += sizeof(uint32);
		}
		else
		{
			*(scratch++) = (char) XLR_BLOCK_ID_DATA_SHORT;
			*(scratch++) = (uint8) mainrdata_len;
		}
		rdt_datas_last->next = mainrdata_head;
		rdt_datas_last = mainrdata_last;
		total_len += mainrdata_len;
	}
	rdt_datas_last->next = NULL;

	hdr_rdt.len = (scratch - hdr_scratch);
	total_len += hdr_rdt.len;

	/*
	 * Calculate CRC of the data
	 *
	 * Note that the record header isn't added into the CRC initially since we
	 * don't know the prev-link yet.  Thus, the CRC will represent the CRC of
	 * the whole record in the order: rdata, then backup blocks, then record
	 * header.
	 */
	//构造校验码
	INIT_CRC32C(rdata_crc);
	COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
	for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
		COMP_CRC32C(rdata_crc, rdt->data, rdt->len);

	/*
	 * Fill in the fields in the record header. Prev-link is filled in later,
	 * once we know where in the WAL the record will be inserted. The CRC does
	 * not include the record header yet.
	 */
	//设置xlog头部元数据,即XlogRecord Header
	rechdr->xl_xid = GetCurrentTransactionIdIfAny();
	rechdr->xl_tot_len = total_len;
	rechdr->xl_info = info;
	rechdr->xl_rmid = rmid;
	rechdr->xl_prev = InvalidXLogRecPtr;
	rechdr->xl_crc = rdata_crc;

	return &hdr_rdt;
}

XLogInsertRecord函数负责将上面组装好的xlog记录插入xlog buffer中:

XLogRecPtr
XLogInsertRecord(XLogRecData *rdata,
				 XLogRecPtr fpw_lsn,
				 uint8 flags,
				 int num_fpi)
{
	//XLogCtl是xlog buffer的管理结构
	XLogCtlInsert *Insert = &XLogCtl->Insert;
	pg_crc32c	rdata_crc;
	bool		inserted;
	//XlogRecord Header
	XLogRecord *rechdr = (XLogRecord *) rdata->data;
	uint8		info = rechdr->xl_info & ~XLR_INFO_MASK;
	//xlog记录是否跨segment
	bool		isLogSwitch = (rechdr->xl_rmid == RM_XLOG_ID &&
							   info == XLOG_SWITCH);
	XLogRecPtr	StartPos;
	XLogRecPtr	EndPos;
	bool		prevDoPageWrites = doPageWrites;

	/* we assume that all of the record header is in the first chunk */
	Assert(rdata->len >= SizeOfXLogRecord);

	/* cross-check on whether we should be here or not */
	if (!XLogInsertAllowed())
		elog(ERROR, "cannot make new WAL entries during recovery");

	//获取xlog buffer插入锁,一共有8把锁,也就是说同时最多只能有8个并发进行xlog buffer插入
	START_CRIT_SECTION();
	if (isLogSwitch)
		WALInsertLockAcquireExclusive();
	else
		WALInsertLockAcquire();

	if (RedoRecPtr != Insert->RedoRecPtr)
	{
		Assert(RedoRecPtr < Insert->RedoRecPtr);
		RedoRecPtr = Insert->RedoRecPtr;
	}
	doPageWrites = (Insert->fullPageWrites || Insert->forcePageWrites);

	if (doPageWrites &&
		(!prevDoPageWrites ||
		 (fpw_lsn != InvalidXLogRecPtr && fpw_lsn <= RedoRecPtr)))
	{
		/*
		 * Oops, some buffer now needs to be backed up that the caller didn't
		 * back up.  Start over.
		 */
		WALInsertLockRelease();
		END_CRIT_SECTION();
		return InvalidXLogRecPtr;
	}

	/*
	 * Reserve space for the record in the WAL. This also sets the xl_prev
	 * pointer.
	 */
	if (isLogSwitch)
		inserted = ReserveXLogSwitch(&StartPos, &EndPos, &rechdr->xl_prev);
	else
	{
		//从xlog buffer中预留插入位置,由于xlog buffer是一整块连续内存空间,不存在跨segment、跨page概念,所以ReserveXLogInsertLocation中会存在两种地址转换
		ReserveXLogInsertLocation(rechdr->xl_tot_len, &StartPos, &EndPos,
								  &rechdr->xl_prev);
		inserted = true;
	}

	if (inserted)
	{
		/*
		 * Now that xl_prev has been filled in, calculate CRC of the record
		 * header.
		 */
		//校验码相关
		rdata_crc = rechdr->xl_crc;
		COMP_CRC32C(rdata_crc, rechdr, offsetof(XLogRecord, xl_crc));
		FIN_CRC32C(rdata_crc);
		rechdr->xl_crc = rdata_crc;

		/*
		 * All the record data, including the header, is now ready to be
		 * inserted. Copy the record in the space reserved.
		 */
		//将xlog记录数据拷贝到xlog buffer中
		CopyXLogRecordToWAL(rechdr->xl_tot_len, isLogSwitch, rdata,
							StartPos, EndPos);

		/*
		 * Unless record is flagged as not important, update LSN of last
		 * important record in the current slot. When holding all locks, just
		 * update the first one.
		 */
		if ((flags & XLOG_MARK_UNIMPORTANT) == 0)
		{
			int			lockno = holdingAllLocks ? 0 : MyLockNo;

			WALInsertLocks[lockno].l.lastImportantAt = StartPos;
		}
	}
	else
	{
		/*
		 * This was an xlog-switch record, but the current insert location was
		 * already exactly at the beginning of a segment, so there was no need
		 * to do anything.
		 */
	}

	/*
	 * Done! Let others know that we're finished.
	 */
	WALInsertLockRelease();

	MarkCurrentTransactionIdLoggedIfAny();

	END_CRIT_SECTION();

	/*
	 * Update shared LogwrtRqst.Write, if we crossed page boundary.
	 */
	//更新全局变量LogwrtRqst.Write,表示请求xlog write的位置
	if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
	{
		SpinLockAcquire(&XLogCtl->info_lck);
		/* advance global request to include new block(s) */
		if (XLogCtl->LogwrtRqst.Write < EndPos)
			XLogCtl->LogwrtRqst.Write = EndPos;
		/* update local result copy while I have the chance */
		LogwrtResult = XLogCtl->LogwrtResult;
		SpinLockRelease(&XLogCtl->info_lck);
	}

	/*
	 * If this was an XLOG_SWITCH record, flush the record and the empty
	 * padding space that fills the rest of the segment, and perform
	 * end-of-segment actions (eg, notifying archiver).
	 */
	if (isLogSwitch)
	{
		TRACE_POSTGRESQL_WAL_SWITCH();
		//强制日志flush
		XLogFlush(EndPos);

		/*
		 * Even though we reserved the rest of the segment for us, which is
		 * reflected in EndPos, we return a pointer to just the end of the
		 * xlog-switch record.
		 */
		if (inserted)
		{
			EndPos = StartPos + SizeOfXLogRecord;
			if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
			{
				uint64		offset = XLogSegmentOffset(EndPos, wal_segment_size);

				if (offset == EndPos % XLOG_BLCKSZ)
					EndPos += SizeOfXLogLongPHD;
				else
					EndPos += SizeOfXLogShortPHD;
			}
		}
	}

	/*
	 * Update our global variables
	 */
	ProcLastRecPtr = StartPos;
	XactLastRecEnd = EndPos;

	/* Report WAL traffic to the instrumentation. */
	//统计数据相关
	if (inserted)
	{
		pgWalUsage.wal_bytes += rechdr->xl_tot_len;
		pgWalUsage.wal_records++;
		pgWalUsage.wal_fpi += num_fpi;
	}

	return EndPos;
}

通过以上步骤,就完成了xlog记录的注册、组装、插入流程!本篇为小伙伴们带来计算引擎系列——XLOG读写机制源码解读的精彩内容。本文分为4部分:1)xlog 记录格式源码分析 2)xlog buffer申请与初始化源码分析 3)xlog读流程源码分析 4)xlog插入流程源码分析。

He3DB是一款计算/存储分离的云原生数据库,He3DB通过计算/存储分离、数据冷热分层和压缩、智能中间件等技术,来保证高性能和低成本完美兼得,在获得高性能的同时,最大化的帮助客户节省数据库使用成本。

xlog 记录格式源码速览

xlog 记录格式

DETAIL DESCRIPTION

  1. 每条xlog记录可能包含0~N个block data及其配套元数据信息。
  2. 若xlog记录存在main data且数据长度大于255,则用uint32来保存长度,否则用uint8来保存长度。
  3. 若xlog记录存在main data,通过XLogRegisterData(...)函数注册到XLogRecDatas链表mainrdata_head中。
  4. 若xlog记录存在block data,通过XLogRegisterBuffer(...)函数注册到registered_buffer中。

xlog buffer申请与初始化源码速览

xlog buffer申请与初始化

DETAIL DESCRIPTION

  1. XLOGShmemSize:负责计算xlog buffer的大小,由数据库参数wal_buffers进行大小管控。
  2. XLOGShmemInit:负责xlog buffer的初始化。全局变量ControlFile用于缓存confile文件内容。全局变量XLogCtl是xlog buffer的管理结构,包含对buffer索引、自旋锁、buffer空间等的管理。其中LogwrtRqst表示目前xlog 请求的write和flush位置,RedoRecPtr已经完成的xlog write和flush的位置。

xlog读流程源码解读

XLOG读

DETAIL DESCRIPTION

  1. XLogBeginRead:设置xlog读的起始点RecPtr,RecPtr可以是一条有效xlog的开始位置,也可以一个page的开始位置,此处只是设置位置,不会引发错误,但是如果RecPtr无效,在后续xlog读的过程中会引发错误。
  2. ReadRecord:postgresql中维护全局变量state用于协调xlog读取,ReadRecord用于实现读取下一条xlog,ReadRecord函数在首次调用之前必须先调用XLogBeginRead()。如果已经没有xlog可获取,返回NULL。如果处于备机状态,则会一直重试,直到获取有效xlog。
  3. XLogReadRecord:读取一条xlog,即state->EndRecPtr。一条xlog可能会跨page,所以争对这种情况需要进行xlog数据重新组装。
  4. ReadPageInternal:读取目标xlog page。
  5. XLogPageRead:调用文件接口读取目标xlog page。

xlog插入流程源码解读

xlog插入源码速览

XLOG插入过程整体可分为3个部分1)xlog数据注册 2)xlog记录组装 3)xlog记录插入xlog buffer,其中xlog记录组装比较复杂,由函数XLogRecordAssemble负责实现。具体详述如下:

xlog记录组装

函数XLogRecordAssemble分别组装xlog record头部、blockData 头部、mainData头部、blockData、mainData,代码实现如下:

static XLogRecData *
XLogRecordAssemble(RmgrId rmid, uint8 info,
				   XLogRecPtr RedoRecPtr, bool doPageWrites,
				   XLogRecPtr *fpw_lsn, int *num_fpi)
{
	XLogRecData *rdt;
	//数据链表的总长度
	uint32		total_len = 0;
	int			block_id;
	pg_crc32c	rdata_crc;
	registered_buffer *prev_regbuf = NULL;
	XLogRecData *rdt_datas_last;
	XLogRecord *rechdr;
	char	   *scratch = hdr_scratch;

	/*
	 * Note: this function can be called multiple times for the same record.
	 * All the modifications we do to the rdata chains below must handle that.
	 */

	/* The record begins with the fixed-size header */
	//rechdr就是xlog头部,即XlogRecord Header,记录一条xlog记录的元数据
	rechdr = (XLogRecord *) scratch;
	//XlogRecord Header紧跟着就是一些buffer data、full page、compress、main data的元数据信息
	scratch += SizeOfXLogRecord;

	hdr_rdt.next = NULL;
	rdt_datas_last = &hdr_rdt;
	hdr_rdt.data = hdr_scratch;

	/*
	 * Enforce consistency checks for this record if user is looking for it.
	 * Do this before at the beginning of this routine to give the possibility
	 * for callers of XLogInsert() to pass XLR_CHECK_CONSISTENCY directly for
	 * a record.
	 */
	if (wal_consistency_checking[rmid])
		info |= XLR_CHECK_CONSISTENCY;

	/*
	 * Make an rdata chain containing all the data portions of all block
	 * references. This includes the data for full-page images. Also append
	 * the headers for the block references in the scratch buffer.
	 */
	*fpw_lsn = InvalidXLogRecPtr;
	//与前文xlog记录格式相对应,对注册的每一个buffer,构造对应的头部数据,
	//并将对应bufferData或full page加入链表
	for (block_id = 0; block_id < max_registered_block_id; block_id++)
	{
		registered_buffer *regbuf = &registered_buffers[block_id];
		bool		needs_backup;
		bool		needs_data;
		XLogRecordBlockHeader bkpb;
		XLogRecordBlockImageHeader bimg;
		XLogRecordBlockCompressHeader cbimg = {0};
		bool		samerel;
		bool		is_compressed = false;
		bool		include_image;

		if (!regbuf->in_use)
			continue;

		/* Determine if this block needs to be backed up */
		//这面用于判断是否需要备份全页
		if (regbuf->flags & REGBUF_FORCE_IMAGE)
			needs_backup = true;
		else if (regbuf->flags & REGBUF_NO_IMAGE)
			needs_backup = false;
		else if (!doPageWrites)
			needs_backup = false;
		else
		{
			/*
			 * We assume page LSN is first data on *every* page that can be
			 * passed to XLogInsert, whether it has the standard page layout
			 * or not.
			 */
			XLogRecPtr	page_lsn = PageGetLSN(regbuf->page);

			needs_backup = (page_lsn <= RedoRecPtr);
			if (!needs_backup)
			{
				if (*fpw_lsn == InvalidXLogRecPtr || page_lsn < *fpw_lsn)
					*fpw_lsn = page_lsn;
			}
		}

		/* Determine if the buffer data needs to included */
		//这面用于判断注册的buffer是否有bufferData
		if (regbuf->rdata_len == 0)
			needs_data = false;
		else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0)
			needs_data = true;
		else
			needs_data = !needs_backup;

		//设置本buffer的XLogRecordBlockHeader
		bkpb.id = block_id;
		bkpb.fork_flags = regbuf->forkno;
		bkpb.data_length = 0;

		if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
			bkpb.fork_flags |= BKPBLOCK_WILL_INIT;

		/*
		 * If needs_backup is true or WAL checking is enabled for current
		 * resource manager, log a full-page write for the current block.
		 */
		 //如果需要备份全页或者当前日志类型已启用WAL检查,则设置需要做full page write
		include_image = needs_backup || (info & XLR_CHECK_CONSISTENCY) != 0;

		//full page write
		if (include_image)
		{
			Page		page = regbuf->page;
			uint16		compressed_len = 0;

			/*
			 * The page needs to be backed up, so calculate its hole length
			 * and offset.
			 */
			 //标记出page中空洞offset和大小
			if (regbuf->flags & REGBUF_STANDARD)
			{
				/* Assume we can omit data between pd_lower and pd_upper */
				uint16		lower = ((PageHeader) page)->pd_lower;
				uint16		upper = ((PageHeader) page)->pd_upper;

				if (lower >= SizeOfPageHeaderData &&
					upper > lower &&
					upper <= BLCKSZ)
				{
					bimg.hole_offset = lower;
					cbimg.hole_length = upper - lower;
				}
				else
				{
					/* No "hole" to remove */
					bimg.hole_offset = 0;
					cbimg.hole_length = 0;
				}
			}
			else
			{
				/* Not a standard page header, don't try to eliminate "hole" */
				bimg.hole_offset = 0;
				cbimg.hole_length = 0;
			}

			/*
			 * Try to compress a block image if wal_compression is enabled
			 */
			 //进行page数据压缩
			if (wal_compression)
			{
				is_compressed =
					XLogCompressBackupBlock(page, bimg.hole_offset,
											cbimg.hole_length,
											regbuf->compressed_page,
											&compressed_len);
			}

			/*
			 * Fill in the remaining fields in the XLogRecordBlockHeader
			 * struct
			 */
			bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;

			/* Report a full page image constructed for the WAL record */
			*num_fpi += 1;

			/*
			 * Construct XLogRecData entries for the page content.
			 */
			rdt_datas_last->next = &regbuf->bkp_rdatas[0];
			rdt_datas_last = rdt_datas_last->next;

			bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;

			/*
			 * If WAL consistency checking is enabled for the resource manager
			 * of this WAL record, a full-page image is included in the record
			 * for the block modified. During redo, the full-page is replayed
			 * only if BKPIMAGE_APPLY is set.
			 */
			if (needs_backup)
				bimg.bimg_info |= BKPIMAGE_APPLY;

			//将bufferData加入链表
			if (is_compressed)
			{
				bimg.length = compressed_len;
				bimg.bimg_info |= BKPIMAGE_IS_COMPRESSED;

				rdt_datas_last->data = regbuf->compressed_page;
				rdt_datas_last->len = compressed_len;
			}
			else
			{
				bimg.length = BLCKSZ - cbimg.hole_length;

				if (cbimg.hole_length == 0)
				{
					//如果空洞长度为0则记录一个完整页
					rdt_datas_last->data = page;
					rdt_datas_last->len = BLCKSZ;
				}
				else
				{
					/* must skip the hole */
					//如果有空洞,则必须跳过空洞再加入链表
					rdt_datas_last->data = page;
					rdt_datas_last->len = bimg.hole_offset;

					rdt_datas_last->next = &regbuf->bkp_rdatas[1];
					rdt_datas_last = rdt_datas_last->next;

					rdt_datas_last->data =
						page + (bimg.hole_offset + cbimg.hole_length);
					rdt_datas_last->len =
						BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
				}
			}

			total_len += bimg.length;
		}

		//将bufferData加入链表
		if (needs_data)
		{
			/*
			 * Link the caller-supplied rdata chain for this buffer to the
			 * overall list.
			 */
			bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
			bkpb.data_length = regbuf->rdata_len;
			total_len += regbuf->rdata_len;

			rdt_datas_last->next = regbuf->rdata_head;
			rdt_datas_last = regbuf->rdata_tail;
		}

		if (prev_regbuf && RelFileNodeEquals(regbuf->rnode, prev_regbuf->rnode))
		{
			samerel = true;
			bkpb.fork_flags |= BKPBLOCK_SAME_REL;
		}
		else
			samerel = false;
		prev_regbuf = regbuf;

		/* Ok, copy the header to the scratch buffer */
		//初始化本注册buffer的XLogRecordBlockHeader
		memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
		scratch += SizeOfXLogRecordBlockHeader;
		//初始化本注册buffer的XLogRecordBlockImageHeader
		if (include_image)
		{
			memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
			scratch += SizeOfXLogRecordBlockImageHeader;
			//初始化本注册buffer的XLogRecordBlockCompressHeader
			if (cbimg.hole_length != 0 && is_compressed)
			{
				memcpy(scratch, &cbimg,
					   SizeOfXLogRecordBlockCompressHeader);
				scratch += SizeOfXLogRecordBlockCompressHeader;
			}
		}
		//初始化本注册buffer的RelFileNode
		if (!samerel)
		{
			memcpy(scratch, &regbuf->rnode, sizeof(RelFileNode));
			scratch += sizeof(RelFileNode);
		}
		//初始化本注册buffer的BlockNumber
		memcpy(scratch, &regbuf->block, sizeof(BlockNumber));
		scratch += sizeof(BlockNumber);
	}

	/* followed by the record's origin, if any */
	if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
		replorigin_session_origin != InvalidRepOriginId)
	{
		*(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
		memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
		scratch += sizeof(replorigin_session_origin);
	}

	/* followed by toplevel XID, if not already included in previous record */
	if (IsSubTransactionAssignmentPending())
	{
		TransactionId xid = GetTopTransactionIdIfAny();

		/* update the flag (later used by XLogResetInsertion) */
		XLogSetRecordFlags(XLOG_INCLUDE_XID);

		*(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID;
		memcpy(scratch, &xid, sizeof(TransactionId));
		scratch += sizeof(TransactionId);
	}

	/* followed by main data, if any */
	//将注册的mainData加入链表
	if (mainrdata_len > 0)
	{
		if (mainrdata_len > 255)
		{
			*(scratch++) = (char) XLR_BLOCK_ID_DATA_LONG;
			memcpy(scratch, &mainrdata_len, sizeof(uint32));
			scratch += sizeof(uint32);
		}
		else
		{
			*(scratch++) = (char) XLR_BLOCK_ID_DATA_SHORT;
			*(scratch++) = (uint8) mainrdata_len;
		}
		rdt_datas_last->next = mainrdata_head;
		rdt_datas_last = mainrdata_last;
		total_len += mainrdata_len;
	}
	rdt_datas_last->next = NULL;

	hdr_rdt.len = (scratch - hdr_scratch);
	total_len += hdr_rdt.len;

	/*
	 * Calculate CRC of the data
	 *
	 * Note that the record header isn't added into the CRC initially since we
	 * don't know the prev-link yet.  Thus, the CRC will represent the CRC of
	 * the whole record in the order: rdata, then backup blocks, then record
	 * header.
	 */
	//构造校验码
	INIT_CRC32C(rdata_crc);
	COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
	for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
		COMP_CRC32C(rdata_crc, rdt->data, rdt->len);

	/*
	 * Fill in the fields in the record header. Prev-link is filled in later,
	 * once we know where in the WAL the record will be inserted. The CRC does
	 * not include the record header yet.
	 */
	//设置xlog头部元数据,即XlogRecord Header
	rechdr->xl_xid = GetCurrentTransactionIdIfAny();
	rechdr->xl_tot_len = total_len;
	rechdr->xl_info = info;
	rechdr->xl_rmid = rmid;
	rechdr->xl_prev = InvalidXLogRecPtr;
	rechdr->xl_crc = rdata_crc;

	return &hdr_rdt;
}

XLogInsertRecord函数负责将上面组装好的xlog记录插入xlog buffer中:

XLogRecPtr
XLogInsertRecord(XLogRecData *rdata,
				 XLogRecPtr fpw_lsn,
				 uint8 flags,
				 int num_fpi)
{
	//XLogCtl是xlog buffer的管理结构
	XLogCtlInsert *Insert = &XLogCtl->Insert;
	pg_crc32c	rdata_crc;
	bool		inserted;
	//XlogRecord Header
	XLogRecord *rechdr = (XLogRecord *) rdata->data;
	uint8		info = rechdr->xl_info & ~XLR_INFO_MASK;
	//xlog记录是否跨segment
	bool		isLogSwitch = (rechdr->xl_rmid == RM_XLOG_ID &&
							   info == XLOG_SWITCH);
	XLogRecPtr	StartPos;
	XLogRecPtr	EndPos;
	bool		prevDoPageWrites = doPageWrites;

	/* we assume that all of the record header is in the first chunk */
	Assert(rdata->len >= SizeOfXLogRecord);

	/* cross-check on whether we should be here or not */
	if (!XLogInsertAllowed())
		elog(ERROR, "cannot make new WAL entries during recovery");

	//获取xlog buffer插入锁,一共有8把锁,也就是说同时最多只能有8个并发进行xlog buffer插入
	START_CRIT_SECTION();
	if (isLogSwitch)
		WALInsertLockAcquireExclusive();
	else
		WALInsertLockAcquire();

	if (RedoRecPtr != Insert->RedoRecPtr)
	{
		Assert(RedoRecPtr < Insert->RedoRecPtr);
		RedoRecPtr = Insert->RedoRecPtr;
	}
	doPageWrites = (Insert->fullPageWrites || Insert->forcePageWrites);

	if (doPageWrites &&
		(!prevDoPageWrites ||
		 (fpw_lsn != InvalidXLogRecPtr && fpw_lsn <= RedoRecPtr)))
	{
		/*
		 * Oops, some buffer now needs to be backed up that the caller didn't
		 * back up.  Start over.
		 */
		WALInsertLockRelease();
		END_CRIT_SECTION();
		return InvalidXLogRecPtr;
	}

	/*
	 * Reserve space for the record in the WAL. This also sets the xl_prev
	 * pointer.
	 */
	if (isLogSwitch)
		inserted = ReserveXLogSwitch(&StartPos, &EndPos, &rechdr->xl_prev);
	else
	{
		//从xlog buffer中预留插入位置,由于xlog buffer是一整块连续内存空间,不存在跨segment、跨page概念,所以ReserveXLogInsertLocation中会存在两种地址转换
		ReserveXLogInsertLocation(rechdr->xl_tot_len, &StartPos, &EndPos,
								  &rechdr->xl_prev);
		inserted = true;
	}

	if (inserted)
	{
		/*
		 * Now that xl_prev has been filled in, calculate CRC of the record
		 * header.
		 */
		//校验码相关
		rdata_crc = rechdr->xl_crc;
		COMP_CRC32C(rdata_crc, rechdr, offsetof(XLogRecord, xl_crc));
		FIN_CRC32C(rdata_crc);
		rechdr->xl_crc = rdata_crc;

		/*
		 * All the record data, including the header, is now ready to be
		 * inserted. Copy the record in the space reserved.
		 */
		//将xlog记录数据拷贝到xlog buffer中
		CopyXLogRecordToWAL(rechdr->xl_tot_len, isLogSwitch, rdata,
							StartPos, EndPos);

		/*
		 * Unless record is flagged as not important, update LSN of last
		 * important record in the current slot. When holding all locks, just
		 * update the first one.
		 */
		if ((flags & XLOG_MARK_UNIMPORTANT) == 0)
		{
			int			lockno = holdingAllLocks ? 0 : MyLockNo;

			WALInsertLocks[lockno].l.lastImportantAt = StartPos;
		}
	}
	else
	{
		/*
		 * This was an xlog-switch record, but the current insert location was
		 * already exactly at the beginning of a segment, so there was no need
		 * to do anything.
		 */
	}

	/*
	 * Done! Let others know that we're finished.
	 */
	WALInsertLockRelease();

	MarkCurrentTransactionIdLoggedIfAny();

	END_CRIT_SECTION();

	/*
	 * Update shared LogwrtRqst.Write, if we crossed page boundary.
	 */
	//更新全局变量LogwrtRqst.Write,表示请求xlog write的位置
	if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
	{
		SpinLockAcquire(&XLogCtl->info_lck);
		/* advance global request to include new block(s) */
		if (XLogCtl->LogwrtRqst.Write < EndPos)
			XLogCtl->LogwrtRqst.Write = EndPos;
		/* update local result copy while I have the chance */
		LogwrtResult = XLogCtl->LogwrtResult;
		SpinLockRelease(&XLogCtl->info_lck);
	}

	/*
	 * If this was an XLOG_SWITCH record, flush the record and the empty
	 * padding space that fills the rest of the segment, and perform
	 * end-of-segment actions (eg, notifying archiver).
	 */
	if (isLogSwitch)
	{
		TRACE_POSTGRESQL_WAL_SWITCH();
		//强制日志flush
		XLogFlush(EndPos);

		/*
		 * Even though we reserved the rest of the segment for us, which is
		 * reflected in EndPos, we return a pointer to just the end of the
		 * xlog-switch record.
		 */
		if (inserted)
		{
			EndPos = StartPos + SizeOfXLogRecord;
			if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
			{
				uint64		offset = XLogSegmentOffset(EndPos, wal_segment_size);

				if (offset == EndPos % XLOG_BLCKSZ)
					EndPos += SizeOfXLogLongPHD;
				else
					EndPos += SizeOfXLogShortPHD;
			}
		}
	}

	/*
	 * Update our global variables
	 */
	ProcLastRecPtr = StartPos;
	XactLastRecEnd = EndPos;

	/* Report WAL traffic to the instrumentation. */
	//统计数据相关
	if (inserted)
	{
		pgWalUsage.wal_bytes += rechdr->xl_tot_len;
		pgWalUsage.wal_records++;
		pgWalUsage.wal_fpi += num_fpi;
	}

	return EndPos;
}

通过以上步骤,就完成了xlog记录的注册、组装、插入流程!

最后修改时间:2023-07-06 16:44:37
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论