暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

海山数据库(He3DB)源码详解:XLogFlush函数

He3DB_ht 2024-08-21
66

XLogFlush函数

作用:确保WAL日志写入缓冲区-刷新到磁盘上

  • 函数定义
void XLogFlush(XLogRecPtr record)

XLogRecPtr --记录XLOG日志的位置

typedef uint64 XLogRecPtr;

record – 当前 尝试写入、刷新的WAL日志位置

  • 变量定义
XLogRecPtr WriteRqstPtr; XLogwrtRqst WriteRqst; TimeLineID insertTLI = XLogCtl->InsertTimeLineID;

XLogwrtRqst结构体,表示请求写入/刷新的WAL日志的位置

typedef struct XLogwrtRqst { XLogRecPtr Write; /* last byte + 1 to write out */ XLogRecPtr Flush; /* last byte + 1 to flush */ } XLogwrtRqst;
  • Write:写入缓冲区,但没落盘
  • Flush:写入磁盘
typedef uint32 TimeLineID;

简称TLI,数据库不同历史状态的标识符。用于防止在恢复数据库到之前的状态(即进行时间点恢复)时发生混淆

  • 条件判断
if (!XLogInsertAllowed()) { UpdateMinRecoveryPoint(record, false); return; }

XLogInsertAllowed函数

LocalXLogInsertAllowed为

  • 1:允许插入XLOG
  • 0:不允许插入XLOG
  • -1:检测RecoveryInProgress()
bool XLogInsertAllowed(void) { if (LocalXLogInsertAllowed >= 0) return (bool) LocalXLogInsertAllowed; if (RecoveryInProgress()) return false; LocalXLogInsertAllowed = 1; return true; }
  • 不允许WAL插入或者正处于恢复,则调用UpdateMinRecoveryPoint(record, false)以更新最小恢复点
  • 检查record是否已经被flush
if (record <= LogwrtResult.Flush) return;

如果record已经被刷新过了,直接return

其中

static XLogwrtResult LogwrtResult = {0, 0}; typedef struct XLogwrtResult { XLogRecPtr Write; /* last byte + 1 written out */ XLogRecPtr Flush; /* last byte + 1 flushed */ } XLogwrtResult;

注意:和XLogwrtRqst结构体不同,这里是result,表示已经完成写入缓冲区、刷新到磁盘的位置

  • 如果定义了 WAL_DEBUG
#ifdef WAL_DEBUG if (XLOG_DEBUG) elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X", LSN_FORMAT_ARGS(record), LSN_FORMAT_ARGS(LogwrtResult.Write), LSN_FORMAT_ARGS(LogwrtResult.Flush));

XLOG_DEBUG 为true

则输出日志信息

  • 临界区开始

    START_CRIT_SECTION();

    函数宏定义:

    #define START_CRIT_SECTION() (CritSectionCount++)

    作用:防止并发或多线程时候产生数据竞争

    临界资源:一次 仅允许一个进程或线程使用的共享资源
    临界区:一个访问共享资源的代码段
    CritSectionCount:临界区中进程或线程的数量

  • 初始化指针WriteRqstPtr

    WriteRqstPtr = record;
  • 死循环

    XLogRecPtr insertpos;

    存储WAL插入的位置

    SpinLockAcquire(&XLogCtl->info_lck);

    前面开启了临界区,仅一个进程或线程能访问共享资源,这里用自旋锁保护共享资源访问,避免数据竞争

    SpinLockAcquire

    • 用于获取自旋锁。如果锁 被其他线程持有,则立即返回,表示锁已成功获取。
    • 如果锁 已被 其他线程持有,则当前线程会进入一个忙等待的循环,直到锁被释放并被当前线程成功获取
    if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write) WriteRqstPtr = XLogCtl->LogwrtRqst.Write; LogwrtResult = XLogCtl->LogwrtResult; SpinLockRelease(&XLogCtl->info_lck);

    更新WriteRqstPtr和LogwrtResult的值
    释放自旋锁

    insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr);

    在实际写入之前,等待所有正在进行的、即将写入的插入操作完成

    等待前序操作完成后 ,接下来要开始写入,再次之前需要获取写的锁

    if (!LWLockAcquireOrWait(WALWriteLock, LW_EXCLUSIVE)) { continue; }

    尝试获取WAL写锁。如果无法立即获取,则等待直到锁被释放,并重新检查是否需要执行刷新操作。

    LogwrtResult = XLogCtl->LogwrtResult; if (record <= LogwrtResult.Flush) { LWLockRelease(WALWriteLock); break; }

    成功获取到锁后,再次更新当前日志写和刷新的
    结果,并检测当前记录是否已经被刷新

    如果已经记录已经被flush,则释放锁,并跳出循环

    频繁写入磁盘,IO增加,为了优化,通常采取批量处理写入操作,减少磁盘IO次数

    if (CommitDelay > 0 && enableFsync && MinimumActiveBackends(CommitSiblings)) { pg_usleep(CommitDelay); insertpos = WaitXLogInsertionsToFinish(insertpos); }

    条件判断:

    • 是否设置提交延迟
    • 是否启用Fsync-fsync是确保数据真正写入磁盘的系统调用,但会引入性能开销
    • 检查是否有足量的其他活动后端(即并发执行的事务)达到了CommitSiblings的阈值

    如果条件都满足

    • 根据CommitDelay的值,调用pg_usleep函数使当前线程暂停指定的微秒数。
      目的:给其他并发事务足够的时间来加入即将发生的组提交中。组提交是一种优化技术,允许多个事务的更改在同一个磁盘I/O操作中一起被写入,从而减少磁盘I/O的次数。
    • 等待所有正在进行的、即将写入的插入操作完成
    WriteRqst.Write = insertpos; WriteRqst.Flush = insertpos; XLogWrite(WriteRqst, insertTLI, false); LWLockRelease(WALWriteLock); break;
    • 记录写入、刷新的位置
    • 调用XLogWrite写入和刷新WAL
    • 释放WAL写锁,并跳出死循环
  • 结束临界区

END_CRIT_SECTION();

函数宏定义:

#define END_CRIT_SECTION() do { Assert(CritSectionCount > 0); CritSectionCount--; } while(0)

CritSectionCount–:临界区中一个线程或进程退出

  • 唤醒wal发送进程请求
WalSndWakeupProcessRequests();

函数宏:

#define WalSndWakeupProcessRequests() do { if (wake_wal_senders) { wake_wal_senders = false; if (max_wal_senders > 0) WalSndWakeup(); } } while (0)

wake_wal_senders :是否有必要唤醒WAL发送进程

发送进程:实现流复制的一部分,负责将主服务器生成的WAL日志实时地发送到从服务器

  • 在流复制中,一个或多个服务器连接到主服务器,并实时地接收并应用从主服务器发送过来的WAL日志。这样,服务器就能够保持与服务器数据的一致性

  • WalSndWakeupProcessRequests()宏的目的:检查是否需要唤醒WAL发送进程。在PostgreSQL中,可能由于多种原因(如新的WAL数据需要被发送到从服务器,或者从服务器请求了更多的WAL数据)而需要唤醒这些进程。

若有必要

  • 重置wake_wal_senders为false,避免重复触发
  • 若最大发送进程大于0,则唤醒
if (LogwrtResult.Flush < record) elog(ERROR, "xlog flush request %X/%X is not satisfied --- flushed only to %X/%X", LSN_FORMAT_ARGS(record), LSN_FORMAT_ARGS(LogwrtResult.Flush));

record:当前尝试刷新到磁盘的WAL记录
LogwrtResult.Flush:当前已经成功刷新到磁盘的WAL记录

if (LogwrtResult.Flush < record)

如果条件成立:

说明期望刷新到record指定的位置,但实际只刷到了LogwrtResult.Flush指定的位置

elog报错,并记录二者指定的位置

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论