XLogFlush函数
作用:确保WAL日志写入缓冲区-刷新到磁盘上
- 函数定义
void XLogFlush(XLogRecPtr record)
XLogRecPtr --记录XLOG日志的位置
typedef uint64 XLogRecPtr;
record – 当前 尝试写入、刷新的WAL日志位置
- 变量定义
XLogRecPtr WriteRqstPtr; XLogwrtRqst WriteRqst; TimeLineID insertTLI = XLogCtl->InsertTimeLineID;
XLogwrtRqst结构体,表示请求写入/刷新的WAL日志的位置
typedef struct XLogwrtRqst
{
XLogRecPtr Write; /* last byte + 1 to write out */
XLogRecPtr Flush; /* last byte + 1 to flush */
} XLogwrtRqst;
- Write:写入缓冲区,但没落盘
- Flush:写入磁盘
typedef uint32 TimeLineID;
简称TLI,数据库不同历史状态的标识符。用于防止在恢复数据库到之前的状态(即进行时间点恢复)时发生混淆
- 条件判断
if (!XLogInsertAllowed())
{
UpdateMinRecoveryPoint(record, false);
return;
}
XLogInsertAllowed函数
LocalXLogInsertAllowed为
- 1:允许插入XLOG
- 0:不允许插入XLOG
- -1:检测RecoveryInProgress()
bool XLogInsertAllowed(void)
{
if (LocalXLogInsertAllowed >= 0)
return (bool) LocalXLogInsertAllowed;
if (RecoveryInProgress())
return false;
LocalXLogInsertAllowed = 1;
return true;
}
- 不允许WAL插入或者正处于恢复,则调用UpdateMinRecoveryPoint(record, false)以更新最小恢复点
- 检查record是否已经被flush
if (record <= LogwrtResult.Flush)
return;
如果record已经被刷新过了,直接return
其中
static XLogwrtResult LogwrtResult = {0, 0};
typedef struct XLogwrtResult
{
XLogRecPtr Write; /* last byte + 1 written out */
XLogRecPtr Flush; /* last byte + 1 flushed */
} XLogwrtResult;
注意:和XLogwrtRqst结构体不同,这里是result,表示已经完成写入缓冲区、刷新到磁盘的位置
- 如果定义了 WAL_DEBUG
#ifdef WAL_DEBUG
if (XLOG_DEBUG)
elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X",
LSN_FORMAT_ARGS(record),
LSN_FORMAT_ARGS(LogwrtResult.Write),
LSN_FORMAT_ARGS(LogwrtResult.Flush));
且 XLOG_DEBUG 为true
则输出日志信息
-
临界区开始
START_CRIT_SECTION();函数宏定义:
#define START_CRIT_SECTION() (CritSectionCount++)作用:防止并发或多线程时候产生数据竞争
临界资源:一次 仅允许一个进程或线程使用的共享资源
临界区:一个访问共享资源的代码段
CritSectionCount:临界区中进程或线程的数量 -
初始化指针WriteRqstPtr
WriteRqstPtr = record; -
死循环
XLogRecPtr insertpos;存储WAL插入的位置
SpinLockAcquire(&XLogCtl->info_lck);前面开启了临界区,仅一个进程或线程能访问共享资源,这里用自旋锁保护共享资源访问,避免数据竞争
SpinLockAcquire
- 用于获取自旋锁。如果锁 未 被其他线程持有,则立即返回,表示锁已成功获取。
- 如果锁 已被 其他线程持有,则当前线程会进入一个忙等待的循环,直到锁被释放并被当前线程成功获取
if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write) WriteRqstPtr = XLogCtl->LogwrtRqst.Write; LogwrtResult = XLogCtl->LogwrtResult; SpinLockRelease(&XLogCtl->info_lck);更新WriteRqstPtr和LogwrtResult的值
释放自旋锁insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr);在实际写入之前,等待所有正在进行的、即将写入的插入操作完成
等待前序操作完成后 ,接下来要开始写入,再次之前需要获取写的锁
if (!LWLockAcquireOrWait(WALWriteLock, LW_EXCLUSIVE)) { continue; }尝试获取WAL写锁。如果无法立即获取,则等待直到锁被释放,并重新检查是否需要执行刷新操作。
LogwrtResult = XLogCtl->LogwrtResult; if (record <= LogwrtResult.Flush) { LWLockRelease(WALWriteLock); break; }成功获取到锁后,再次更新当前日志写和刷新的
结果,并检测当前记录是否已经被刷新如果已经记录已经被flush,则释放锁,并跳出循环
频繁写入磁盘,IO增加,为了优化,通常采取批量处理写入操作,减少磁盘IO次数
if (CommitDelay > 0 && enableFsync && MinimumActiveBackends(CommitSiblings)) { pg_usleep(CommitDelay); insertpos = WaitXLogInsertionsToFinish(insertpos); }条件判断:
- 是否设置提交延迟
- 是否启用Fsync-fsync是确保数据真正写入磁盘的系统调用,但会引入性能开销
- 检查是否有足量的其他活动后端(即并发执行的事务)达到了CommitSiblings的阈值
如果条件都满足
- 根据CommitDelay的值,调用pg_usleep函数使当前线程暂停指定的微秒数。
目的:给其他并发事务足够的时间来加入即将发生的组提交中。组提交是一种优化技术,允许多个事务的更改在同一个磁盘I/O操作中一起被写入,从而减少磁盘I/O的次数。 - 等待所有正在进行的、即将写入的插入操作完成
WriteRqst.Write = insertpos; WriteRqst.Flush = insertpos; XLogWrite(WriteRqst, insertTLI, false); LWLockRelease(WALWriteLock); break;- 记录写入、刷新的位置
- 调用XLogWrite写入和刷新WAL
- 释放WAL写锁,并跳出死循环
-
结束临界区
END_CRIT_SECTION();
函数宏定义:
#define END_CRIT_SECTION()
do {
Assert(CritSectionCount > 0);
CritSectionCount--;
} while(0)
CritSectionCount–:临界区中一个线程或进程退出
- 唤醒wal发送进程请求
WalSndWakeupProcessRequests();
函数宏:
#define WalSndWakeupProcessRequests()
do
{
if (wake_wal_senders)
{
wake_wal_senders = false;
if (max_wal_senders > 0)
WalSndWakeup();
}
} while (0)
wake_wal_senders :是否有必要唤醒WAL发送进程
发送进程:实现流复制的一部分,负责将主服务器生成的WAL日志实时地发送到从服务器
在流复制中,一个或多个从服务器连接到主服务器,并实时地接收并应用从主服务器发送过来的WAL日志。这样,从服务器就能够保持与主服务器数据的一致性
WalSndWakeupProcessRequests()宏的目的:检查是否需要唤醒WAL发送进程。在PostgreSQL中,可能由于多种原因(如新的WAL数据需要被发送到从服务器,或者从服务器请求了更多的WAL数据)而需要唤醒这些进程。
若有必要
- 重置wake_wal_senders为false,避免重复触发
- 若最大发送进程大于0,则唤醒
if (LogwrtResult.Flush < record)
elog(ERROR,
"xlog flush request %X/%X is not satisfied --- flushed only to %X/%X",
LSN_FORMAT_ARGS(record),
LSN_FORMAT_ARGS(LogwrtResult.Flush));
record:当前尝试刷新到磁盘的WAL记录
LogwrtResult.Flush:当前已经成功刷新到磁盘的WAL记录
if (LogwrtResult.Flush < record)
如果条件成立:
说明期望刷新到record指定的位置,但实际只刷到了LogwrtResult.Flush指定的位置
elog报错,并记录二者指定的位置




