暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

海山数据库(He3DB)源码详解:海山Mysql 崩溃恢复(3)-日志应用

He3DB_ht 2024-12-20
28

海山数据库(He3DB)源码详解:海山Mysql 崩溃恢复(3)-日志应用

崩溃恢复之日志应用

recv_recovery_from_checkpoint_start_for_se(flushed_lsn)为崩溃恢复的入口函数,其从innobase_start_or_create_for_mysql()调用,传入参数flushed_lsn是读取存在系统表空间的fil_write_flushed_lsn获得的,该函数在日志解析、日志应用等线程创建之前,进行崩溃恢复操作。

MySQL redo日志崩溃恢复整体流程如图所示,其中,日志应用的流程主要集中在右边部分,recv_parse_or_apply_log_rec_body用于解析或应用日志记录体,recv_recover_page_func用于恢复页面函数,这是应用日志记录到数据库页面的关键步骤。

图中还显示了日志解析和日志应用流程之间的交互,特别是在解析日志记录后,如何将它们应用到数据库页面以完成恢复过程。本文将针对日志应用过程中涉及到的几个函数源码进行详细解析。

在这里插入图片描述

1、recv_apply_hashed_log_recs()函数流程

void recv_apply_hashed_log_recs( /*=======================*/ ibool allow_ibuf) { recv_addr_t* recv_addr; ulint i; ibool has_printed = FALSE; mtr_t mtr; loop: mutex_enter(&(recv_sys->mutex)); if (recv_sys->apply_batch_on) { mutex_exit(&(recv_sys->mutex)); os_thread_sleep(500000); goto loop; } ut_ad(!allow_ibuf == log_mutex_own()); if (!allow_ibuf) { recv_no_ibuf_operations = true; } recv_sys->apply_log_recs = TRUE; recv_sys->apply_batch_on = TRUE; for (i = 0; i < hash_get_n_cells(recv_sys->addr_hash); i++) { for (recv_addr = static_cast<recv_addr_t*>( HASH_GET_FIRST(recv_sys->addr_hash, i)); recv_addr != 0; recv_addr = static_cast<recv_addr_t*>( HASH_GET_NEXT(addr_hash, recv_addr))) { if (srv_is_tablespace_truncated(recv_addr->space)) { ut_a(recv_sys->n_addrs); recv_addr->state = RECV_DISCARDED; recv_sys->n_addrs--; continue; } if (recv_addr->state == RECV_DISCARDED) { ut_a(recv_sys->n_addrs); recv_sys->n_addrs--; continue; } const page_id_t page_id(recv_addr->space, recv_addr->page_no); bool found; const page_size_t& page_size = fil_space_get_page_size(recv_addr->space, &found); ut_ad(found); if (recv_addr->state == RECV_NOT_PROCESSED) { if (!has_printed) { ib::info() << "Starting an apply batch" " of log records" " to the database..."; fputs("InnoDB: Progress in percent: ", stderr); has_printed = TRUE; } mutex_exit(&(recv_sys->mutex)); if (buf_page_peek(page_id)) { buf_block_t* block; mtr_start(&mtr); block = buf_page_get( page_id, page_size, RW_X_LATCH, &mtr); buf_block_dbg_add_level( block, SYNC_NO_ORDER_CHECK); recv_recover_page(FALSE, block); mtr_commit(&mtr); } else { recv_read_in_area(page_id); } mutex_enter(&(recv_sys->mutex)); } } if (has_printed && (i * 100) / hash_get_n_cells(recv_sys->addr_hash) != ((i + 1) * 100) / hash_get_n_cells(recv_sys->addr_hash)) { fprintf(stderr, "%lu ", (ulong) ((i * 100) / hash_get_n_cells(recv_sys->addr_hash))); } } /* Wait until all the pages have been processed */ while (recv_sys->n_addrs != 0) { mutex_exit(&(recv_sys->mutex)); os_thread_sleep(500000); mutex_enter(&(recv_sys->mutex)); } if (has_printed) { fprintf(stderr, "\n"); } if (!allow_ibuf) { /* Flush all the file pages to disk and invalidate them in the buffer pool */ ut_d(recv_no_log_write = true); mutex_exit(&(recv_sys->mutex)); log_mutex_exit(); /* Stop the recv_writer thread from issuing any LRU flush batches. */ mutex_enter(&recv_sys->writer_mutex); /* Wait for any currently run batch to end. */ buf_flush_wait_LRU_batch_end(); os_event_reset(recv_sys->flush_end); recv_sys->flush_type = BUF_FLUSH_LIST; os_event_set(recv_sys->flush_start); os_event_wait(recv_sys->flush_end); buf_pool_invalidate(); /* Allow batches from recv_writer thread. */ mutex_exit(&recv_sys->writer_mutex); log_mutex_enter(); mutex_enter(&(recv_sys->mutex)); ut_d(recv_no_log_write = false); recv_no_ibuf_operations = false; } recv_sys->apply_log_recs = FALSE; recv_sys->apply_batch_on = FALSE; recv_sys_empty_hash(); if (has_printed) { ib::info() << "Apply batch completed"; } mutex_exit(&(recv_sys->mutex)); }
  • recv_apply_hashed_log_recs()函数的作用是将存储的日志记录从哈希表中清空,并将它们应用到相应的页面中。下面是对该函数的详细解析。

1、设置正在进行应用状态

recv_sys->apply_log_recs = TRUE; recv_sys->apply_batch_on = TRUE;

2、取出哈希节点:
redo日志最终存储在recv_sys->addr_hash哈希表结构如下所示:
在这里插入图片描述

  • 因此想要取出哈希节点需要进行2次for循环:
  • 第1次for循环,是对哈希桶进行遍历。
  • 第2次for循环,对哈希链表进行变量,此时每个节点代表每个将进行应用的数据页
for (i = 0; i < hash_get_n_cells(recv_sys->addr_hash); i++) { for (recv_addr = static_cast<recv_addr_t*>( HASH_GET_FIRST(recv_sys->addr_hash, i)); recv_addr != 0; recv_addr = static_cast<recv_addr_t*>( HASH_GET_NEXT(addr_hash, recv_addr)))

3、如果表已经truncate

  • 检查当前表是否truncate,修改truncate表的哈希节点状态为RECV_DISCARDED,并且减少n_addrs
if (srv_is_tablespace_truncated(recv_addr->space)) { /* Avoid applying REDO log for the tablespace that is schedule for TRUNCATE. */ ut_a(recv_sys->n_addrs); recv_addr->state = RECV_DISCARDED; recv_sys->n_addrs--; continue; }

4、缓冲池中存在当前数据页,调用recv_recover_page宏。

  • recv_recover_page宏将调用recv_recover_page_func()进行应用操作。
  • 当前if条件下的流程是1个mtr。

5、如果不在缓冲池中,调用recv_read_in_area()读取数据页到缓冲池。

  • recv_read_in_area()函数后续采用异步读取的方式,由io_handler_thread完成读取,并在读取完成后判断是否在崩溃恢复过程中,在崩溃恢复过程中随后进行应用操作。
if (buf_page_peek(page_id)) { buf_block_t* block; mtr_start(&mtr); block = buf_page_get( page_id, page_size, RW_X_LATCH, &mtr); buf_block_dbg_add_level( block, SYNC_NO_ORDER_CHECK); recv_recover_page(FALSE, block); mtr_commit(&mtr); } else { recv_read_in_area(page_id); }

6、等待所有数据页应用完成。

while (recv_sys->n_addrs != 0) { mutex_exit(&(recv_sys->mutex)); os_thread_sleep(500000); mutex_enter(&(recv_sys->mutex)); }

7、修改全局状态并且情况recv_sys的哈希表。

  • recv_sys_empty_hash()在清理了recv_sys旧的哈希表后,为recv_sys分配了新的哈希表。
recv_sys->apply_log_recs = FALSE; recv_sys->apply_batch_on = FALSE; recv_sys_empty_hash();

该函数的大致流程如下图所示:
在这里插入图片描述

2、recv_recover_page_func()函数流程

void recv_recover_page_func( buf_block_t* block) /*!< in/out: buffer block */ { page_t* page; page_zip_des_t* page_zip; recv_addr_t* recv_addr; recv_t* recv; byte* buf; lsn_t start_lsn; lsn_t end_lsn; lsn_t page_lsn; lsn_t page_newest_lsn; ibool modification_to_page; mtr_t mtr; mutex_enter(&(recv_sys->mutex)); if (recv_sys->apply_log_recs == FALSE) { mutex_exit(&(recv_sys->mutex)); return; } recv_addr = recv_get_fil_addr_struct(block->page.id.space(), block->page.id.page_no()); if ((recv_addr == NULL) || (recv_addr->state == RECV_BEING_PROCESSED) || (recv_addr->state == RECV_PROCESSED)) { ut_ad(recv_addr == NULL || recv_needed_recovery); mutex_exit(&(recv_sys->mutex)); return; } recv_addr->state = RECV_BEING_PROCESSED; mutex_exit(&(recv_sys->mutex)); mtr_start(&mtr); mtr_set_log_mode(&mtr, MTR_LOG_NONE); page = block->frame; page_zip = buf_block_get_page_zip(block); /* Read the newest modification lsn from the page */ page_lsn = mach_read_from_8(page + FIL_PAGE_LSN); modification_to_page = FALSE; start_lsn = end_lsn = 0; recv = UT_LIST_GET_FIRST(recv_addr->rec_list); while (recv) { end_lsn = recv->end_lsn; ut_ad(end_lsn <= UT_LIST_GET_FIRST(log_sys->log_groups)->scanned_lsn); if (recv->len > RECV_DATA_BLOCK_SIZE) { /* We have to copy the record body to a separate buffer */ buf = static_cast<byte*>(ut_malloc_nokey(recv->len)); recv_data_copy_to_buf(buf, recv); } else { buf = ((byte*)(recv->data)) + sizeof(recv_data_t); } if (recv->type == MLOG_INIT_FILE_PAGE) { page_lsn = page_newest_lsn; memset(FIL_PAGE_LSN + page, 0, 8); memset(UNIV_PAGE_SIZE - FIL_PAGE_END_LSN_OLD_CHKSUM + page, 0, 8); if (page_zip) { memset(FIL_PAGE_LSN + page_zip->data, 0, 8); } } bool skip_recv = false; if (srv_was_tablespace_truncated(fil_space_get(recv_addr->space))) { lsn_t init_lsn = truncate_t::get_truncated_tablespace_init_lsn( recv_addr->space); skip_recv = (recv->start_lsn < init_lsn); } if (recv->start_lsn >= page_lsn && !srv_is_tablespace_truncated(recv_addr->space) && !skip_recv) { lsn_t end_lsn; if (!modification_to_page) { modification_to_page = TRUE; start_lsn = recv->start_lsn; } DBUG_PRINT("ib_log", ("apply " LSN_PF ":" " %s len " ULINTPF " page %u:%u", recv->start_lsn, get_mlog_string(recv->type), recv->len, recv_addr->space, recv_addr->page_no)); recv_parse_or_apply_log_rec_body( recv->type, buf, buf + recv->len, recv_addr->space, recv_addr->page_no, block, &mtr); end_lsn = recv->start_lsn + recv->len; mach_write_to_8(FIL_PAGE_LSN + page, end_lsn); mach_write_to_8(UNIV_PAGE_SIZE - FIL_PAGE_END_LSN_OLD_CHKSUM + page, end_lsn); if (page_zip) { mach_write_to_8(FIL_PAGE_LSN + page_zip->data, end_lsn); } } if (recv->len > RECV_DATA_BLOCK_SIZE) { ut_free(buf); } recv = UT_LIST_GET_NEXT(rec_list, recv); } mtr.discard_modifications(); mtr_commit(&mtr); mutex_enter(&(recv_sys->mutex)); if (recv_max_page_lsn < page_lsn) { recv_max_page_lsn = page_lsn; } recv_addr->state = RECV_PROCESSED; ut_a(recv_sys->n_addrs); recv_sys->n_addrs--; mutex_exit(&(recv_sys->mutex)); }
  • recv_recover_page_func()函数中,如果页面的日志序列号(LSN)小于日志记录的LSN,则将哈希日志记录应用到该页面。这可以在缓冲区页面刚刚被读入时调用,也可以对已经在缓冲区池中的页面调用。下面是对该函数的详细解析。

1、根据表空间号和页号取出对应日志链表。

recv_addr = recv_get_fil_addr_struct(block->page.id.space(), block->page.id.page_no()); if ((recv_addr == NULL) || (recv_addr->state == RECV_BEING_PROCESSED) || (recv_addr->state == RECV_PROCESSED)) { ut_ad(recv_addr == NULL || recv_needed_recovery); mutex_exit(&(recv_sys->mutex)); return; }

2、修改日志链表状态为RECV_BEING_PROCESSED

recv_addr->state = RECV_BEING_PROCESSED;

3、从数据页中读取最后被修改的lsn值,如果是在热备份状态下,从缓冲池控制块读取lsn

page_lsn = mach_read_from_8(page + FIL_PAGE_LSN); #ifndef UNIV_HOTBACKUP page_newest_lsn = buf_page_get_newest_modification(&block->page); if (page_newest_lsn) { page_lsn = page_newest_lsn; } #else page_newest_lsn = 0; #endif

4、从日志链表读取日志,并且对每个日志进行循环。

recv = UT_LIST_GET_FIRST(recv_addr->rec_list); while (recv) { ... }

5、如果日志长度过长,采用动态分配空间将redo日志拷贝到这块内存中,否则直接使用recv中的数据。

if (recv->len > RECV_DATA_BLOCK_SIZE) { buf = static_cast<byte*>(ut_malloc_nokey(recv->len)); recv_data_copy_to_buf(buf, recv); } else { buf = ((byte*)(recv->data)) + sizeof(recv_data_t); }

6、如果当前表truncate,跳过truncate操作之前的redo日志

bool skip_recv = false; if (srv_was_tablespace_truncated(fil_space_get(recv_addr->space))) { lsn_t init_lsn = truncate_t::get_truncated_tablespace_init_lsn( recv_addr->space); skip_recv = (recv->start_lsn < init_lsn); }

7、如果日志的lsn>页的lsn,调用recv_parse_or_apply_log_rec_body()进行日志应用,并在应用完成后,更新数据页中的lsn字段。

if (!modification_to_page) { modification_to_page = TRUE; start_lsn = recv->start_lsn; } DBUG_PRINT("ib_log", ("apply " LSN_PF ":" " %s len " ULINTPF " page %u:%u", recv->start_lsn, get_mlog_string(recv->type), recv->len, recv_addr->space, recv_addr->page_no)); recv_parse_or_apply_log_rec_body( recv->type, buf, buf + recv->len, recv_addr->space, recv_addr->page_no, block, &mtr); end_lsn = recv->start_lsn + recv->len; mach_write_to_8(FIL_PAGE_LSN + page, end_lsn); mach_write_to_8(UNIV_PAGE_SIZE - FIL_PAGE_END_LSN_OLD_CHKSUM + page, end_lsn); if (page_zip) { mach_write_to_8(FIL_PAGE_LSN + page_zip->data, end_lsn); }

8、如果当前redo日志使用的动态分配的内存,释放这部分内存,并且调用UT_LIST_GET_NEXT宏,获取日志链表的下一条日志,之后进入下一轮循环。

if (recv->len > RECV_DATA_BLOCK_SIZE) { ut_free(buf); } recv = UT_LIST_GET_NEXT(rec_list, recv);

9、调用buf_flush_recv_note_modificatio()函数,更新数据页缓冲池控制块的newest_modification,并且在flush链表中重新排序
10、应用流程接收修改日志链表状态为RECV_PROCESSED并且减少n_addrs

recv_addr->state = RECV_PROCESSED; ut_a(recv_sys->n_addrs); recv_sys->n_addrs--; mutex_exit(&(recv_sys->mutex));

该函数的大致流程如下图所示:
在这里插入图片描述

3、recv_parse_or_apply_log_rec_body()函数流程

static byte* recv_parse_or_apply_log_rec_body( mlog_id_t type, byte* ptr, byte* end_ptr, ulint space_id, ulint page_no, buf_block_t* block, mtr_t* mtr) { ut_ad(!block == !mtr); switch (type) { /* 根据日志类型,调用对应函数 */ ... default: break; } dict_index_t* index = NULL; page_t* page; page_zip_des_t* page_zip; #ifdef UNIV_DEBUG ulint page_type; #endif /* UNIV_DEBUG */ if (block) { /* Applying a page log record. */ page = block->frame; page_zip = buf_block_get_page_zip(block); ut_d(page_type = fil_page_get_type(page)); } else { /* Parsing a page log record. */ page = NULL; page_zip = NULL; ut_d(page_type = FIL_PAGE_TYPE_ALLOCATED); } const byte* old_ptr = ptr; switch (type) { /* 根据日志类型,调用对应函数 */ ... break; } ptr = mlog_parse_nbytes(type, ptr, end_ptr, page, page_zip); if (ptr != NULL && page != NULL && page_no == 0 && type == MLOG_4BYTES) { ulint offs = mach_read_from_2(old_ptr); switch (offs) { fil_space_t* space; ulint val; default: break; /* 根据日志类型,调用对应函数 */ ... } } } break; case MLOG_REC_INSERT: case MLOG_COMP_REC_INSERT: ... ... if (index) { dict_table_t* table = index->table; dict_mem_index_free(index); dict_mem_table_free(table); } return(ptr); }
  • recv_parse_or_apply_log_rec_body()函数的主要作用是根据redo日志类型,对redo日志的数据部分进行解析和应用的操作都是在该函数中完成的,该函数根据传入参数blcok是否为NULL,来区分是否进行日志应用操作,如果block == NULL,后续只进行日志解析,不进行日志应用。
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论