这篇文章介绍 bgrewriteaof 命令的源码实现。
本篇文章中,源码采用 redis 5.0.8 版本。
作用
The rewrite will create a small optimized version of the current Append Only File.
随着 AOF 文件越来越大,里面有大量重复或者可以合并的命令,重写的目的就是减少 AOF 文件尺寸,加快数据恢复时间。
特点
If a Redis child is creating a snapshot on disk, the AOF rewrite is scheduled but not started until the saving child producing the RDB file terminates.
Since Redis 2.4 the AOF rewrite is automatically triggered by Redis, however the BGREWRITEAOF command can be used to trigger a rewrite at any time.
即:生成快照和 AOF 重写不能同时进行;Redis 2.4 版本以后,AOF 重写机制可以自动触发。
下面,我们就来看看 bgrewriteaof 命令是如何实现的。
bgrewriteaof 命令的入口函数是:bgrewriteaofCommand()。
void bgrewriteaofCommand(client *c) {if (server.aof_child_pid != -1) {addReplyError(c,"Background append only file rewriting already in progress");} else if (server.rdb_child_pid != -1) {server.aof_rewrite_scheduled = 1;addReplyStatus(c,"Background append only file rewriting scheduled");} else if (rewriteAppendOnlyFileBackground() == C_OK) {addReplyStatus(c,"Background append only file rewriting started");} else {addReply(c,shared.err);}}
正常情况下,bgrewriteaofCommand() 函数调用 rewriteAppendOnlyFileBackground() 函数执行 AOF 重写。
特点1:生成快照和 AOF 重写不能同时进行。
void bgrewriteaofCommand(client *c)else if (server.rdb_child_pid != -1) {server.aof_rewrite_scheduled = 1;addReplyStatus(c,"Background append only file rewriting scheduled");}
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData)/* Start a scheduled AOF rewrite if this was requested by the user while* a BGSAVE was in progress. */if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&server.aof_rewrite_scheduled){rewriteAppendOnlyFileBackground();}
如果执行 bgrewriteaof 命令时,Redis 正在生成快照(server.rdb_child_pid != -1),则命令不能被执行,因此设置 server.aof_rewrite_scheduled = 1,生成快照结束以后,serverCron() 函数调用 rewriteAppendOnlyFileBackground() 函数执行 AOF 重写。
特点2:AOF 重写机制可以自动触发。
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData)/* Trigger an AOF rewrite if needed. */if (server.aof_state == AOF_ON &&server.rdb_child_pid == -1 &&server.aof_child_pid == -1 &&server.aof_rewrite_perc &&server.aof_current_size > server.aof_rewrite_min_size){long long base = server.aof_rewrite_base_size ?server.aof_rewrite_base_size : 1;long long growth = (server.aof_current_size*100/base) - 100;if (growth >= server.aof_rewrite_perc) {serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);rewriteAppendOnlyFileBackground();}}
满足一定条件的情况下,serverCron() 函数调用 rewriteAppendOnlyFileBackground() 函数执行 AOF 重写。
由此可见,无论在何种情况下,执行 AOF 重写都是由 rewriteAppendOnlyFileBackground() 函数完成的。
/* This is how rewriting of the append only file in background works:** 1) The user calls BGREWRITEAOF* 2) Redis calls this function, that forks():* 2a) the child rewrite the append only file in a temp file.* 2b) the parent accumulates differences in server.aof_rewrite_buf.* 3) When the child finished '2a' exists.* 4) The parent will trap the exit code, if it's OK, will append the* data accumulated into server.aof_rewrite_buf into the temp file, and* finally will rename(2) the temp file in the actual file name.* The the new file is reopened as the new append only file. Profit!*/int rewriteAppendOnlyFileBackground(void) {pid_t childpid;long long start;if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;if (aofCreatePipes() != C_OK) return C_ERR;openChildInfoPipe();start = ustime();if ((childpid = fork()) == 0) {char tmpfile[256];/* Child */closeListeningSockets(0);redisSetProcTitle("redis-aof-rewrite");snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());if (rewriteAppendOnlyFile(tmpfile) == C_OK) {size_t private_dirty = zmalloc_get_private_dirty(-1);if (private_dirty) {serverLog(LL_NOTICE,"AOF rewrite: %zu MB of memory used by copy-on-write",private_dirty/(1024*1024));}server.child_info_data.cow_size = private_dirty;sendChildInfo(CHILD_INFO_TYPE_AOF);exitFromChild(0);} else {exitFromChild(1);}} else {/* Parent */server.stat_fork_time = ustime()-start;server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 server.stat_fork_time (1024*1024*1024); /* GB per second. */latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);if (childpid == -1) {closeChildInfoPipe();serverLog(LL_WARNING,"Can't rewrite append only file in background: fork: %s",strerror(errno));aofClosePipes();return C_ERR;}serverLog(LL_NOTICE,"Background append only file rewriting started by pid %d",childpid);server.aof_rewrite_scheduled = 0;server.aof_rewrite_time_start = time(NULL);server.aof_child_pid = childpid;updateDictResizePolicy();/* We set appendseldb to -1 in order to force the next call to the* feedAppendOnlyFile() to issue a SELECT command, so the differences* accumulated by the parent into server.aof_rewrite_buf will start* with a SELECT statement and it will be safe to merge. */server.aof_selected_db = -1;replicationScriptCacheFlush();return C_OK;}return C_OK; /* unreached */}
rewriteAppendOnlyFileBackground() 函数 fork 一个子进程,子进程调用 rewriteAppendOnlyFile() 函数执行重写过程。
/* Write a sequence of commands able to fully rebuild the dataset into* "filename". Used both by REWRITEAOF and BGREWRITEAOF.** In order to minimize the number of commands needed in the rewritten* log Redis uses variadic commands when possible, such as RPUSH, SADD* and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time* are inserted using a single command. */int rewriteAppendOnlyFile(char *filename) {rio aof;FILE *fp;char tmpfile[256];char byte;/* Note that we have to use a different temp name here compared to the* one used by rewriteAppendOnlyFileBackground() function. */snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());fp = fopen(tmpfile,"w");if (!fp) {serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));return C_ERR;}server.aof_child_diff = sdsempty();rioInitWithFile(&aof,fp);if (server.aof_rewrite_incremental_fsync)rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES);if (server.aof_use_rdb_preamble) {int error;if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) {errno = error;goto werr;}} else {if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;}/* Do an initial slow fsync here while the parent is still sending* data, in order to make the next final fsync faster. */if (fflush(fp) == EOF) goto werr;if (fsync(fileno(fp)) == -1) goto werr;/* Read again a few times to get more data from the parent.* We can't read forever (the server may receive data from clients* faster than it is able to send data to the child), so we try to read* some more data in a loop as soon as there is a good chance more data* will come. If it looks like we are wasting time, we abort (this* happens after 20 ms without new data). */int nodata = 0;mstime_t start = mstime();while(mstime()-start < 1000 && nodata < 20) {if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0){nodata++;continue;}nodata = 0; /* Start counting from zero, we stop on N *contiguous*timeouts. */aofReadDiffFromParent();}/* Ask the master to stop sending diffs. */if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK)goto werr;/* We read the ACK from the server using a 10 seconds timeout. Normally* it should reply ASAP, but just in case we lose its reply, we are sure* the child will eventually get terminated. */if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||byte != '!') goto werr;serverLog(LL_NOTICE,"Parent agreed to stop sending diffs. Finalizing AOF...");/* Read the final diff if any. */aofReadDiffFromParent();/* Write the received diff to the file. */serverLog(LL_NOTICE,"Concatenating %.2f MB of AOF diff received from parent.",(double) sdslen(server.aof_child_diff) (1024*1024));if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0)goto werr;/* Make sure data will not remain on the OS's output buffers */if (fflush(fp) == EOF) goto werr;if (fsync(fileno(fp)) == -1) goto werr;if (fclose(fp) == EOF) goto werr;/* Use RENAME to make sure the DB file is changed atomically only* if the generate DB file is ok. */if (rename(tmpfile,filename) == -1) {serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));unlink(tmpfile);return C_ERR;}serverLog(LL_NOTICE,"SYNC append only file rewrite performed");return C_OK;werr:serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno));fclose(fp);unlink(tmpfile);return C_ERR;}
rewriteAppendOnlyFile() 函数打开临时文件 temp-rewriteaof-%d.aof,调用 rewriteAppendOnlyFileRio() 函数将内存数据转换成命令,写入到临时文件中,然后从父进程读取增量数据,写入到临时文件中,最后调用 rename() 函数将临时文件名修改为 temp-rewriteaof-bg-%d.aof。
int rewriteAppendOnlyFileRio(rio *aof) {dictIterator *di = NULL;dictEntry *de;size_t processed = 0;int j;for (j = 0; j < server.dbnum; j++) {char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";redisDb *db = server.db+j;dict *d = db->dict;if (dictSize(d) == 0) continue;di = dictGetSafeIterator(d);/* SELECT the new DB */if (rioWrite(aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;if (rioWriteBulkLongLong(aof,j) == 0) goto werr;/* Iterate this DB writing every entry */while((de = dictNext(di)) != NULL) {sds keystr;robj key, *o;long long expiretime;keystr = dictGetKey(de);o = dictGetVal(de);initStaticStringObject(key,keystr);expiretime = getExpire(db,&key);/* Save the key and associated value */if (o->type == OBJ_STRING) {/* Emit a SET command */char cmd[]="*3\r\n$3\r\nSET\r\n";if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;/* Key and value */if (rioWriteBulkObject(aof,&key) == 0) goto werr;if (rioWriteBulkObject(aof,o) == 0) goto werr;} else if (o->type == OBJ_LIST) {if (rewriteListObject(aof,&key,o) == 0) goto werr;} else if (o->type == OBJ_SET) {if (rewriteSetObject(aof,&key,o) == 0) goto werr;} else if (o->type == OBJ_ZSET) {if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr;} else if (o->type == OBJ_HASH) {if (rewriteHashObject(aof,&key,o) == 0) goto werr;} else if (o->type == OBJ_STREAM) {if (rewriteStreamObject(aof,&key,o) == 0) goto werr;} else if (o->type == OBJ_MODULE) {if (rewriteModuleObject(aof,&key,o) == 0) goto werr;} else {serverPanic("Unknown object type");}/* Save the expire time */if (expiretime != -1) {char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;if (rioWriteBulkObject(aof,&key) == 0) goto werr;if (rioWriteBulkLongLong(aof,expiretime) == 0) goto werr;}/* Read some diff from the parent process from time to time. */if (aof->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) {processed = aof->processed_bytes;aofReadDiffFromParent();}}dictReleaseIterator(di);di = NULL;}return C_OK;werr:if (di) dictReleaseIterator(di);return C_ERR;}
rewriteAppendOnlyFileRio() 函数将内存数据转换成命令,写入到文件中。其中:
String 类型转换成 SET 命令;
List 类型转换成 RPUSH 命令;
Hash 类型转换成 HMSET 命令;
Set 类型转换成 SADD 命令;
Sorted Set 类型转换成 ZADD 命令。
同时,调用 aofReadDiffFromParent() 函数从父进程读取增量数据。
/* This function is called by the child rewriting the AOF file to read* the difference accumulated from the parent into a buffer, that is* concatenated at the end of the rewrite. */ssize_t aofReadDiffFromParent(void) {char buf[65536]; /* Default pipe buffer size on most Linux systems. */ssize_t nread, total = 0;while ((nread =read(server.aof_pipe_read_data_from_parent,buf,sizeof(buf))) > 0) {server.aof_child_diff = sdscatlen(server.aof_child_diff,buf,nread);total += nread;}return total;}
子进程完成以后,退出。
接下来,父进程做最后的收尾工作:
在 serverCron() 函数中有如下代码:
/* Check if a background saving or AOF rewrite in progress terminated. */if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||ldbPendingChildren()){int statloc;pid_t pid;if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {int exitcode = WEXITSTATUS(statloc);int bysignal = 0;if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);if (pid == -1) {serverLog(LL_WARNING,"wait3() returned an error: %s. ""rdb_child_pid = %d, aof_child_pid = %d",strerror(errno),(int) server.rdb_child_pid,(int) server.aof_child_pid);} else if (pid == server.rdb_child_pid) {backgroundSaveDoneHandler(exitcode,bysignal);if (!bysignal && exitcode == 0) receiveChildInfo();} else if (pid == server.aof_child_pid) {backgroundRewriteDoneHandler(exitcode,bysignal);if (!bysignal && exitcode == 0) receiveChildInfo();} else {if (!ldbRemoveChild(pid)) {serverLog(LL_WARNING,"Warning, detected child with unmatched pid: %ld",(long)pid);}}updateDictResizePolicy();closeChildInfoPipe();}}
子进程退出以后,退出状态被父进程 wait3() 函数捕获,接着调用 backgroundRewriteDoneHandler() 函数进行处理。
/* A background append only file rewriting (BGREWRITEAOF) terminated its work.* Handle this. */void backgroundRewriteDoneHandler(int exitcode, int bysignal)/* Flush the differences accumulated by the parent to the* rewritten AOF. */latencyStartMonitor(latency);snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",(int)server.aof_child_pid);newfd = open(tmpfile,O_WRONLY|O_APPEND);if (newfd == -1) {serverLog(LL_WARNING,"Unable to open the temporary AOF produced by the child: %s", strerror(errno));goto cleanup;}if (aofRewriteBufferWrite(newfd) == -1) {serverLog(LL_WARNING,"Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));close(newfd);goto cleanup;}latencyEndMonitor(latency);latencyAddSampleIfNeeded("aof-rewrite-diff-write",latency);serverLog(LL_NOTICE,"Residual parent diff successfully flushed to the rewritten AOF (%.2f MB)", (double) aofRewriteBufferSize() / (1024*1024));/* The only remaining thing to do is to rename the temporary file to* the configured file and switch the file descriptor used to do AOF* writes. We don't want close(2) or rename(2) calls to block the* server on old file deletion.** There are two possible scenarios:** 1) AOF is DISABLED and this was a one time rewrite. The temporary* file will be renamed to the configured file. When this file already* exists, it will be unlinked, which may block the server.** 2) AOF is ENABLED and the rewritten AOF will immediately start* receiving writes. After the temporary file is renamed to the* configured file, the original AOF file descriptor will be closed.* Since this will be the last reference to that file, closing it* causes the underlying file to be unlinked, which may block the* server.** To mitigate the blocking effect of the unlink operation (either* caused by rename(2) in scenario 1, or by close(2) in scenario 2), we* use a background thread to take care of this. First, we* make scenario 1 identical to scenario 2 by opening the target file* when it exists. The unlink operation after the rename(2) will then* be executed upon calling close(2) for its descriptor. Everything to* guarantee atomicity for this switch has already happened by then, so* we don't care what the outcome or duration of that close operation* is, as long as the file descriptor is released again. */if (server.aof_fd == -1) {/* AOF disabled *//* Don't care if this fails: oldfd will be -1 and we handle that.* One notable case of -1 return is if the old file does* not exist. */oldfd = open(server.aof_filename,O_RDONLY|O_NONBLOCK);} else {/* AOF enabled */oldfd = -1; /* We'll set this to the current AOF filedes later. */}/* Rename the temporary file. This will not unlink the target file if* it exists, because we reference it with "oldfd". */latencyStartMonitor(latency);if (rename(tmpfile,server.aof_filename) == -1) {serverLog(LL_WARNING,"Error trying to rename the temporary AOF file %s into %s: %s",tmpfile,server.aof_filename,strerror(errno));close(newfd);if (oldfd != -1) close(oldfd);goto cleanup;}latencyEndMonitor(latency);latencyAddSampleIfNeeded("aof-rename",latency);
backgroundRewriteDoneHandler() 函数打开临时文件 temp-rewriteaof-bg-%d.aof,调用 aofRewriteBufferWrite() 函数将缓存中的增量数据写入到临时文件中,最后调用 rename() 函数将临时文件名修改为 server.aof_filename。
注意:这里的临时文件和子进程最终保存的临时文件是同一个文件(即:temp-rewriteaof-bg-%d.aof)。
/* Write the buffer (possibly composed of multiple blocks) into the specified* fd. If a short write or any other error happens -1 is returned,* otherwise the number of bytes written is returned. */ssize_t aofRewriteBufferWrite(int fd) {listNode *ln;listIter li;ssize_t count = 0;listRewind(server.aof_rewrite_buf_blocks,&li);while((ln = listNext(&li))) {aofrwblock *block = listNodeValue(ln);ssize_t nwritten;if (block->used) {nwritten = write(fd,block->buf,block->used);if (nwritten != (ssize_t)block->used) {if (nwritten == 0) errno = EIO;return -1;}count += nwritten;}}return count;}
backgroundRewriteDoneHandler() 函数执行完成以后,bgrewriteaof 命令就结束了。
前面我们一直提到增量数据,那么增量数据是哪里来的呢?
redis 命令的执行,最终都是通过调用 call() 函数来实现的。
void call(client *c, int flags)/* Propagate the command into the AOF and replication link */if (flags & CMD_CALL_PROPAGATE &&(c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP){int propagate_flags = PROPAGATE_NONE;/* Check if the command operated changes in the data set. If so* set for replication AOF propagation. */if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);/* If the client forced AOF replication of the command, set* the flags regardless of the command effects on the data set. */if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;/* However prevent AOF replication propagation if the command* implementations called preventCommandPropagation() or similar,* or if we don't have the call() flags to do so. */if (c->flags & CLIENT_PREVENT_REPL_PROP ||!(flags & CMD_CALL_PROPAGATE_REPL))propagate_flags &= ~PROPAGATE_REPL;if (c->flags & CLIENT_PREVENT_AOF_PROP ||!(flags & CMD_CALL_PROPAGATE_AOF))propagate_flags &= ~PROPAGATE_AOF;/* Call propagate() only if at least one of AOF replication* propagation is needed. Note that modules commands handle replication* in an explicit way, so we never replicate them automatically. */if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);}
每执行一条命令,call() 函数就调用一次 propagate() 函数。
/* Propagate the specified command (in the context of the specified database id)* to AOF and Slaves.** flags are an xor between:* + PROPAGATE_NONE (no propagation of command at all)* + PROPAGATE_AOF (propagate into the AOF file if is enabled)* + PROPAGATE_REPL (propagate into the replication link)** This should not be used inside commands implementation. Use instead* alsoPropagate(), preventCommandPropagation(), forceCommandPropagation().*/void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,int flags){if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)feedAppendOnlyFile(cmd,dbid,argv,argc);if (flags & PROPAGATE_REPL)replicationFeedSlaves(server.slaves,dbid,argv,argc);}
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {sds buf = sdsempty();robj *tmpargv[3];/* The DB this command was targeting is not the same as the last command* we appended. To issue a SELECT command is needed. */if (dictid != server.aof_selected_db) {char seldb[64];snprintf(seldb,sizeof(seldb),"%d",dictid);buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",(unsigned long)strlen(seldb),seldb);server.aof_selected_db = dictid;}if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||cmd->proc == expireatCommand) {/* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);} else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {/* Translate SETEX/PSETEX to SET and PEXPIREAT */tmpargv[0] = createStringObject("SET",3);tmpargv[1] = argv[1];tmpargv[2] = argv[3];buf = catAppendOnlyGenericCommand(buf,3,tmpargv);decrRefCount(tmpargv[0]);buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);} else if (cmd->proc == setCommand && argc > 3) {int i;robj *exarg = NULL, *pxarg = NULL;/* Translate SET [EX seconds][PX milliseconds] to SET and PEXPIREAT */buf = catAppendOnlyGenericCommand(buf,3,argv);for (i = 3; i < argc; i ++) {if (!strcasecmp(argv[i]->ptr, "ex")) exarg = argv[i+1];if (!strcasecmp(argv[i]->ptr, "px")) pxarg = argv[i+1];}serverAssert(!(exarg && pxarg));if (exarg)buf = catAppendOnlyExpireAtCommand(buf,server.expireCommand,argv[1],exarg);if (pxarg)buf = catAppendOnlyExpireAtCommand(buf,server.pexpireCommand,argv[1],pxarg);} else {/* All the other commands don't need translation or need the* same translation already operated in the command vector* for the replication itself. */buf = catAppendOnlyGenericCommand(buf,argc,argv);}/* Append to the AOF buffer. This will be flushed on disk just before* of re-entering the event loop, so before the client will get a* positive reply about the operation performed. */if (server.aof_state == AOF_ON)server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));/* If a background append only file rewriting is in progress we want to* accumulate the differences between the child DB and the current one* in a buffer, so that when the child process will do its work we* can append the differences to the new append only file. */if (server.aof_child_pid != -1)aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));sdsfree(buf);}
如果我们开启了 AOF,则 feedAppendOnlyFile() 函数将命令写入到 server.aof_buf 缓存中。
如果此时,我们正在进行 AOF 重写,则 feedAppendOnlyFile() 函数调用 aofRewriteBufferAppend() 函数将命令写入到 server.aof_rewrite_buf_blocks 缓存中,server.aof_rewrite_buf_blocks 缓存中的数据即是增量数据。
/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {listNode *ln = listLast(server.aof_rewrite_buf_blocks);aofrwblock *block = ln ? ln->value : NULL;while(len) {/* If we already got at least an allocated block, try appending* at least some piece into it. */if (block) {unsigned long thislen = (block->free < len) ? block->free : len;if (thislen) { /* The current block is not already full. */memcpy(block->buf+block->used, s, thislen);block->used += thislen;block->free -= thislen;s += thislen;len -= thislen;}}if (len) { /* First block to allocate, or need another block. */int numblocks;block = zmalloc(sizeof(*block));block->free = AOF_RW_BUF_BLOCK_SIZE;block->used = 0;listAddNodeTail(server.aof_rewrite_buf_blocks,block);/* Log every time we cross more 10 or 100 blocks, respectively* as a notice or warning. */numblocks = listLength(server.aof_rewrite_buf_blocks);if (((numblocks+1) % 10) == 0) {int level = ((numblocks+1) % 100) == 0 ? LL_WARNING :LL_NOTICE;serverLog(level,"Background AOF buffer size: %lu MB",aofRewriteBufferSize()/(1024*1024));}}}/* Install a file event to send data to the rewrite child if there is* not one already. */if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) {aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,AE_WRITABLE, aofChildWriteDiffData, NULL);}}
aofRewriteBufferAppend() 函数将命令写入到 server.aof_rewrite_buf_blocks 缓存中,然后创建一个写文件事件,事件处理函数为:aofChildWriteDiffData()。
/* Event handler used to send data to the child process doing the AOF* rewrite. We send pieces of our AOF differences buffer so that the final* write when the child finishes the rewrite will be small. */void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {listNode *ln;aofrwblock *block;ssize_t nwritten;UNUSED(el);UNUSED(fd);UNUSED(privdata);UNUSED(mask);while(1) {ln = listFirst(server.aof_rewrite_buf_blocks);block = ln ? ln->value : NULL;if (server.aof_stop_sending_diff || !block) {aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,AE_WRITABLE);return;}if (block->used > 0) {nwritten = write(server.aof_pipe_write_data_to_child,block->buf,block->used);if (nwritten <= 0) return;memmove(block->buf,block->buf+nwritten,block->used-nwritten);block->used -= nwritten;block->free += nwritten;}if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln);}}
aofChildWriteDiffData() 函数将 server.aof_rewrite_buf_blocks 缓存中的数据发送给子进程。
细节补充
前面我们介绍了 bgrewriteaof 命令的执行过程,但还有一些细节补充如下:
/* Create the pipes used for parent - child process IPC during rewrite.* We have a data pipe used to send AOF incremental diffs to the child,* and two other pipes used by the children to signal it finished with* the rewrite so no more data should be written, and another for the* parent to acknowledge it understood this new condition. */int aofCreatePipes(void) {int fds[6] = {-1, -1, -1, -1, -1, -1};int j;if (pipe(fds) == -1) goto error; /* parent -> children data. */if (pipe(fds+2) == -1) goto error; /* children -> parent ack. */if (pipe(fds+4) == -1) goto error; /* parent -> children ack. *//* Parent -> children data is non blocking. */if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;server.aof_pipe_write_data_to_child = fds[1];server.aof_pipe_read_data_from_parent = fds[0];server.aof_pipe_write_ack_to_parent = fds[3];server.aof_pipe_read_ack_from_child = fds[2];server.aof_pipe_write_ack_to_child = fds[5];server.aof_pipe_read_ack_from_parent = fds[4];server.aof_stop_sending_diff = 0;return C_OK;error:serverLog(LL_WARNING,"Error opening /setting AOF rewrite IPC pipes: %s",strerror(errno));for (j = 0; j < 6; j++) if(fds[j] != -1) close(fds[j]);return C_ERR;}
在主进程 fork 子进程之前,rewriteAppendOnlyFileBackground() 函数调用 aofCreatePipes() 函数创建 3 个管道用于主进程与子进程之间通信。同时,在其中一个管道上创建一个读文件事件,事件处理函数为:aofChildPipeReadable()。
/* This event handler is called when the AOF rewriting child sends us a* single '!' char to signal we should stop sending buffer diffs. The* parent sends a '!' as well to acknowledge. */void aofChildPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) {char byte;UNUSED(el);UNUSED(privdata);UNUSED(mask);if (read(fd,&byte,1) == 1 && byte == '!') {serverLog(LL_NOTICE,"AOF rewrite child asks to stop sending diffs.");server.aof_stop_sending_diff = 1;if (write(server.aof_pipe_write_ack_to_child,"!",1) != 1) {/* If we can't send the ack, inform the user, but don't try again* since in the other side the children will use a timeout if the* kernel can't buffer our write, or, the children was* terminated. */serverLog(LL_WARNING,"Can't send ACK to AOF child: %s",strerror(errno));}}/* Remove the handler since this can be called only one time during a* rewrite. */aeDeleteFileEvent(server.el,server.aof_pipe_read_ack_from_child,AE_READABLE);}
当主进程收到子进程发送的 !,则主进程停止向子进程发送增量数据,同时发送 ! 作为确认。




