暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

He3DB团队源码解读:pg_prewarm插件源码解读

原创 红烧肉 2023-11-06
142

He3DB 是由移动云数据库团队研发的一款计算/存储分离的云原生数据库,He3DB通过计算/存储分离、数据冷热分层和压缩、智能中间件等技术,来保证高性能和低成本完美兼得,在获得高性能的同时,最大化的帮助客户节省数据库使用成本。

pg_prewarm简介

pg_prewarm是一个用于预热PostgreSQL数据库的插件。它可以在数据库启动或指定的时间间隔内预先加载表或索引的数据到内存中,以加快查询性能。

当数据库重启时,通常会有一段时间需要等待数据从磁盘加载到内存中,这会导致查询的响应时间较长。而使用pg_prewarm插件可以在数据库启动时将特定的表或索引加载到内存中,从而避免这种等待时间。pg_prewarm插件可以通过在postgresql.conf配置文件中指定需要预热的表或索引的名称来使用。它可以预热整个表或只预热表中的特定分区。插件还提供了可选的预热策略,包括从前往后或从后往前预热。预热过程是在数据库启动或者指定的时间间隔内进行的,可以通过修改配置文件中的参数来调整预热的时间间隔和频率。预热操作会在数据库日志中记录,以便管理员可以监控和分析预热的效果。总的来说,pg_prewarm插件是一个提高PostgreSQL查询性能的工具,通过预先加载数据到内存中,减少了查询时的磁盘访问时间,从而提升了整体的数据库性能。本文将对pg_prewarm进行核心源码解读。

dump缓存数据

apw_dump_now函数以{database,tablespace,filenode,forknum,blocknum}的格式dump缓存空间中的缓存数据,并将dump数据存储到autoprewarm.blocks文件中。关键代码下:

static int apw_dump_now(bool is_bgworker, bool dump_unlogged) { int num_blocks; int i; int ret; BlockInfoRecord *block_info_array; BufferDesc *bufHdr; FILE *file; char transient_dump_file_path[MAXPGPATH]; pid_t pid; LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE); pid = apw_state->pid_using_dumpfile; if (apw_state->pid_using_dumpfile == InvalidPid) apw_state->pid_using_dumpfile = MyProcPid; LWLockRelease(&apw_state->lock); if (pid != InvalidPid) { if (!is_bgworker) ereport(ERROR, (errmsg("could not perform block dump because dump file is being used by PID %lu", (unsigned long) apw_state->pid_using_dumpfile))); ereport(LOG, (errmsg("skipping block dump because it is already being performed by PID %lu", (unsigned long) apw_state->pid_using_dumpfile))); return 0; } block_info_array = (BlockInfoRecord *) palloc(sizeof(BlockInfoRecord) * NBuffers); for (num_blocks = 0, i = 0; i < NBuffers; i++) { uint32 buf_state; CHECK_FOR_INTERRUPTS(); bufHdr = GetBufferDescriptor(i); /* Lock each buffer header before inspecting. */ buf_state = LockBufHdr(bufHdr); /* * Unlogged tables will be automatically truncated after a crash or * unclean shutdown. In such cases we need not prewarm them. Dump them * only if requested by caller. */ if (buf_state & BM_TAG_VALID && ((buf_state & BM_PERMANENT) || dump_unlogged)) { block_info_array[num_blocks].database = bufHdr->tag.rnode.dbNode; block_info_array[num_blocks].tablespace = bufHdr->tag.rnode.spcNode; block_info_array[num_blocks].filenode = bufHdr->tag.rnode.relNode; block_info_array[num_blocks].forknum = bufHdr->tag.forkNum; block_info_array[num_blocks].blocknum = bufHdr->tag.blockNum; ++num_blocks; } UnlockBufHdr(bufHdr, buf_state); } snprintf(transient_dump_file_path, MAXPGPATH, "%s.tmp", AUTOPREWARM_FILE); file = AllocateFile(transient_dump_file_path, "w"); if (!file) ereport(ERROR, (errcode_for_file_access(), errmsg("could not open file \"%s\": %m", transient_dump_file_path))); ret = fprintf(file, "<<%d>>\n", num_blocks); if (ret < 0) { int save_errno = errno; FreeFile(file); unlink(transient_dump_file_path); errno = save_errno; ereport(ERROR, (errcode_for_file_access(), errmsg("could not write to file \"%s\": %m", transient_dump_file_path))); } for (i = 0; i < num_blocks; i++) { CHECK_FOR_INTERRUPTS(); ret = fprintf(file, "%u,%u,%u,%u,%u\n", block_info_array[i].database, block_info_array[i].tablespace, block_info_array[i].filenode, (uint32) block_info_array[i].forknum, block_info_array[i].blocknum); if (ret < 0) { int save_errno = errno; FreeFile(file); unlink(transient_dump_file_path); errno = save_errno; ereport(ERROR, (errcode_for_file_access(), errmsg("could not write to file \"%s\": %m", transient_dump_file_path))); } } pfree(block_info_array); /* * Rename transient_dump_file_path to AUTOPREWARM_FILE to make things * permanent. */ ret = FreeFile(file); if (ret != 0) { int save_errno = errno; unlink(transient_dump_file_path); errno = save_errno; ereport(ERROR, (errcode_for_file_access(), errmsg("could not close file \"%s\": %m", transient_dump_file_path))); } (void) durable_rename(transient_dump_file_path, AUTOPREWARM_FILE, ERROR); apw_state->pid_using_dumpfile = InvalidPid; ereport(DEBUG1, (errmsg_internal("wrote block details for %d blocks", num_blocks))); return num_blocks; }

apw_dump_now函数会遍历整个shared_buffers,记录每一个缓存数据页的元数据信息,包括database、tablespace、filenode、forknum、blocknum,遍历结束后,将元数据信息以文件的形式进行落盘保存,文件名为autoprewarm.blocks。
图片

数据预热

autoprewarm_main函数会在数据库启动时被调用,并根据配置参数执行dump缓存数据操作,pg_prewarm.autoprewarm_interval参数用于指的dump间隔。除了调用apw_dump_now执行dump缓存数据操作外,启动时它还会预先调用apw_load_buffers函数读取autoprewarm.blocks文件,逐个加载指定的表或索引数据到内存中,以提高查询性能。

static void apw_load_buffers(void) { FILE *file = NULL; int num_elements, i; BlockInfoRecord *blkinfo; dsm_segment *seg; /* * Skip the prewarm if the dump file is in use; otherwise, prevent any * other process from writing it while we're using it. */ LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE); if (apw_state->pid_using_dumpfile == InvalidPid) apw_state->pid_using_dumpfile = MyProcPid; else { LWLockRelease(&apw_state->lock); ereport(LOG, (errmsg("skipping prewarm because block dump file is being written by PID %lu", (unsigned long) apw_state->pid_using_dumpfile))); return; } LWLockRelease(&apw_state->lock); /* * Open the block dump file. Exit quietly if it doesn't exist, but report * any other error. */ file = AllocateFile(AUTOPREWARM_FILE, "r"); if (!file) { if (errno == ENOENT) { LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE); apw_state->pid_using_dumpfile = InvalidPid; LWLockRelease(&apw_state->lock); return; /* No file to load. */ } ereport(ERROR, (errcode_for_file_access(), errmsg("could not read file \"%s\": %m", AUTOPREWARM_FILE))); } /* First line of the file is a record count. */ if (fscanf(file, "<<%d>>\n", &num_elements) != 1) ereport(ERROR, (errcode_for_file_access(), errmsg("could not read from file \"%s\": %m", AUTOPREWARM_FILE))); /* Allocate a dynamic shared memory segment to store the record data. */ seg = dsm_create(sizeof(BlockInfoRecord) * num_elements, 0); blkinfo = (BlockInfoRecord *) dsm_segment_address(seg); /* Read records, one per line. */ for (i = 0; i < num_elements; i++) { unsigned forknum; if (fscanf(file, "%u,%u,%u,%u,%u\n", &blkinfo[i].database, &blkinfo[i].tablespace, &blkinfo[i].filenode, &forknum, &blkinfo[i].blocknum) != 5) ereport(ERROR, (errmsg("autoprewarm block dump file is corrupted at line %d", i + 1))); blkinfo[i].forknum = forknum; } FreeFile(file); /* Sort the blocks to be loaded. */ pg_qsort(blkinfo, num_elements, sizeof(BlockInfoRecord), apw_compare_blockinfo); /* Populate shared memory state. */ apw_state->block_info_handle = dsm_segment_handle(seg); apw_state->prewarm_start_idx = apw_state->prewarm_stop_idx = 0; apw_state->prewarmed_blocks = 0; /* Get the info position of the first block of the next database. */ while (apw_state->prewarm_start_idx < num_elements) { int j = apw_state->prewarm_start_idx; Oid current_db = blkinfo[j].database; /* * Advance the prewarm_stop_idx to the first BlockInfoRecord that does * not belong to this database. */ j++; while (j < num_elements) { if (current_db != blkinfo[j].database) { /* * Combine BlockInfoRecords for global objects with those of * the database. */ if (current_db != InvalidOid) break; current_db = blkinfo[j].database; } j++; } /* * If we reach this point with current_db == InvalidOid, then only * BlockInfoRecords belonging to global objects exist. We can't * prewarm without a database connection, so just bail out. */ if (current_db == InvalidOid) break; /* Configure stop point and database for next per-database worker. */ apw_state->prewarm_stop_idx = j; apw_state->database = current_db; Assert(apw_state->prewarm_start_idx < apw_state->prewarm_stop_idx); /* If we've run out of free buffers, don't launch another worker. */ if (!have_free_buffer()) break; /* * Likewise, don't launch if we've already been told to shut down. * (The launch would fail anyway, but we might as well skip it.) */ if (ShutdownRequestPending) break; /* * Start a per-database worker to load blocks for this database; this * function will return once the per-database worker exits. */ apw_start_database_worker(); /* Prepare for next database. */ apw_state->prewarm_start_idx = apw_state->prewarm_stop_idx; } /* Clean up. */ dsm_detach(seg); LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE); apw_state->block_info_handle = DSM_HANDLE_INVALID; apw_state->pid_using_dumpfile = InvalidPid; LWLockRelease(&apw_state->lock); /* Report our success, if we were able to finish. */ if (!ShutdownRequestPending) ereport(LOG, (errmsg("autoprewarm successfully prewarmed %d of %d previously-loaded blocks", apw_state->prewarmed_blocks, num_elements))); }

apw_load_buffers(void)函数首先将autoprewarm.blocks文件读入内存,按照database、tablespace、filenode、forknum、blocknum的顺序进行排序,然后以database为加载单位,进行数据加载。加载动作由autoprewarm_database_main负责实现。
autoprewarm_database_main函数负责建立数据库连接,然后依次读取需要预热的数据页,直至本database中所有目标数据页全部预热完成,并开始另外database数据的预热。

void autoprewarm_database_main(Datum main_arg) { int pos; BlockInfoRecord *block_info; Relation rel = NULL; BlockNumber nblocks = 0; BlockInfoRecord *old_blk = NULL; dsm_segment *seg; /* Establish signal handlers; once that's done, unblock signals. */ pqsignal(SIGTERM, die); BackgroundWorkerUnblockSignals(); /* Connect to correct database and get block information. */ apw_init_shmem(); seg = dsm_attach(apw_state->block_info_handle); if (seg == NULL) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("could not map dynamic shared memory segment"))); BackgroundWorkerInitializeConnectionByOid(apw_state->database, InvalidOid, 0); block_info = (BlockInfoRecord *) dsm_segment_address(seg); pos = apw_state->prewarm_start_idx; /* * Loop until we run out of blocks to prewarm or until we run out of free * buffers. */ while (pos < apw_state->prewarm_stop_idx && have_free_buffer()) { BlockInfoRecord *blk = &block_info[pos++]; Buffer buf; CHECK_FOR_INTERRUPTS(); /* * Quit if we've reached records for another database. If previous * blocks are of some global objects, then continue pre-warming. */ if (old_blk != NULL && old_blk->database != blk->database && old_blk->database != 0) break; /* * As soon as we encounter a block of a new relation, close the old * relation. Note that rel will be NULL if try_relation_open failed * previously; in that case, there is nothing to close. */ if (old_blk != NULL && old_blk->filenode != blk->filenode && rel != NULL) { relation_close(rel, AccessShareLock); rel = NULL; CommitTransactionCommand(); } /* * Try to open each new relation, but only once, when we first * encounter it. If it's been dropped, skip the associated blocks. */ if (old_blk == NULL || old_blk->filenode != blk->filenode) { Oid reloid; Assert(rel == NULL); StartTransactionCommand(); reloid = RelidByRelfilenode(blk->tablespace, blk->filenode); if (OidIsValid(reloid)) rel = try_relation_open(reloid, AccessShareLock); if (!rel) CommitTransactionCommand(); } if (!rel) { old_blk = blk; continue; } /* Once per fork, check for fork existence and size. */ if (old_blk == NULL || old_blk->filenode != blk->filenode || old_blk->forknum != blk->forknum) { RelationOpenSmgr(rel); /* * smgrexists is not safe for illegal forknum, hence check whether * the passed forknum is valid before using it in smgrexists. */ if (blk->forknum > InvalidForkNumber && blk->forknum <= MAX_FORKNUM && smgrexists(rel->rd_smgr, blk->forknum)) nblocks = RelationGetNumberOfBlocksInFork(rel, blk->forknum); else nblocks = 0; } /* Check whether blocknum is valid and within fork file size. */ if (blk->blocknum >= nblocks) { /* Move to next forknum. */ old_blk = blk; continue; } /* Prewarm buffer. */ buf = ReadBufferExtended(rel, blk->forknum, blk->blocknum, RBM_NORMAL, NULL); if (BufferIsValid(buf)) { apw_state->prewarmed_blocks++; ReleaseBuffer(buf); } old_blk = blk; } dsm_detach(seg); /* Release lock on previous relation. */ if (rel) { relation_close(rel, AccessShareLock); CommitTransactionCommand(); } }

以上就是pg_prewarm插件的核心源码解读。

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论