暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

redis 源码解析:bgrewriteaof

谷竹 2021-02-05
762

这篇文章介绍 bgrewriteaof 命令的源码实现。


本篇文章中,源码采用 redis 5.0.8 版本。




作用

The rewrite will create a small optimized version of the current Append Only File.

随着 AOF 文件越来越大,里面有大量重复或者可以合并的命令,重写的目的就是减少 AOF 文件尺寸,加快数据恢复时间。


特点

  • If a Redis child is creating a snapshot on disk, the AOF rewrite is scheduled but not started until the saving child producing the RDB file terminates.

  • Since Redis 2.4 the AOF rewrite is automatically triggered by Redis, however the BGREWRITEAOF command can be used to trigger a rewrite at any time.

即:生成快照和 AOF 重写不能同时进行;Redis 2.4 版本以后,AOF 重写机制可以自动触发。




下面,我们就来看看 bgrewriteaof 命令是如何实现的。


bgrewriteaof 命令的入口函数是:bgrewriteaofCommand()。

    void bgrewriteaofCommand(client *c) {
    if (server.aof_child_pid != -1) {
    addReplyError(c,"Background append only file rewriting already in progress");
    } else if (server.rdb_child_pid != -1) {
    server.aof_rewrite_scheduled = 1;
    addReplyStatus(c,"Background append only file rewriting scheduled");
    } else if (rewriteAppendOnlyFileBackground() == C_OK) {
    addReplyStatus(c,"Background append only file rewriting started");
    } else {
    addReply(c,shared.err);
    }
    }

    正常情况下,bgrewriteaofCommand() 函数调用 rewriteAppendOnlyFileBackground() 函数执行 AOF 重写


    特点1:生成快照和 AOF 重写不能同时进行

      void bgrewriteaofCommand(client *c)


      else if (server.rdb_child_pid != -1) {
          server.aof_rewrite_scheduled = 1;
          addReplyStatus(c,"Background append only file rewriting scheduled");
      }
        int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData)


        /* Start a scheduled AOF rewrite if this was requested by the user while
         * a BGSAVE was in progress. */
        if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
            server.aof_rewrite_scheduled)
        {
            rewriteAppendOnlyFileBackground();
        }

        如果执行 bgrewriteaof 命令时,Redis 正在生成快照(server.rdb_child_pid != -1),则命令不能被执行,因此设置 server.aof_rewrite_scheduled = 1,生成快照结束以后,serverCron() 函数调用 rewriteAppendOnlyFileBackground() 函数执行 AOF 重写


        特点2:AOF 重写机制可以自动触发。

          int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData)


          /* Trigger an AOF rewrite if needed. */
          if (server.aof_state == AOF_ON &&
              server.rdb_child_pid == -1 &&
              server.aof_child_pid == -1 &&
              server.aof_rewrite_perc &&
              server.aof_current_size > server.aof_rewrite_min_size)
          {
              long long base = server.aof_rewrite_base_size ?
                  server.aof_rewrite_base_size : 1;
              long long growth = (server.aof_current_size*100/base) - 100;
              if (growth >= server.aof_rewrite_perc) {
                  serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
                  rewriteAppendOnlyFileBackground();
              }
          }

          满足一定条件的情况下,serverCron() 函数调用 rewriteAppendOnlyFileBackground() 函数执行 AOF 重写。


          由此可见,无论在何种情况下,执行 AOF 重写都是由 rewriteAppendOnlyFileBackground() 函数完成的。


            /* This is how rewriting of the append only file in background works:
            *
            * 1) The user calls BGREWRITEAOF
            * 2) Redis calls this function, that forks():
            * 2a) the child rewrite the append only file in a temp file.
            * 2b) the parent accumulates differences in server.aof_rewrite_buf.
            * 3) When the child finished '2a' exists.
            * 4) The parent will trap the exit code, if it's OK, will append the
            * data accumulated into server.aof_rewrite_buf into the temp file, and
            * finally will rename(2) the temp file in the actual file name.
            * The the new file is reopened as the new append only file. Profit!
            */
            int rewriteAppendOnlyFileBackground(void) {
            pid_t childpid;
            long long start;


            if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
            if (aofCreatePipes() != C_OK) return C_ERR;
            openChildInfoPipe();
            start = ustime();
            if ((childpid = fork()) == 0) {
            char tmpfile[256];


            /* Child */
            closeListeningSockets(0);
            redisSetProcTitle("redis-aof-rewrite");
            snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
            if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
            size_t private_dirty = zmalloc_get_private_dirty(-1);


            if (private_dirty) {
            serverLog(LL_NOTICE,
            "AOF rewrite: %zu MB of memory used by copy-on-write",
            private_dirty/(1024*1024));
            }


            server.child_info_data.cow_size = private_dirty;
            sendChildInfo(CHILD_INFO_TYPE_AOF);
            exitFromChild(0);
            } else {
            exitFromChild(1);
            }
            } else {
            /* Parent */
            server.stat_fork_time = ustime()-start;
            server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 server.stat_fork_time (1024*1024*1024); /* GB per second. */
            latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
            if (childpid == -1) {
            closeChildInfoPipe();
            serverLog(LL_WARNING,
            "Can't rewrite append only file in background: fork: %s",
            strerror(errno));
            aofClosePipes();
            return C_ERR;
            }
            serverLog(LL_NOTICE,
            "Background append only file rewriting started by pid %d",childpid);
            server.aof_rewrite_scheduled = 0;
            server.aof_rewrite_time_start = time(NULL);
            server.aof_child_pid = childpid;
            updateDictResizePolicy();
            /* We set appendseldb to -1 in order to force the next call to the
            * feedAppendOnlyFile() to issue a SELECT command, so the differences
            * accumulated by the parent into server.aof_rewrite_buf will start
            * with a SELECT statement and it will be safe to merge. */
            server.aof_selected_db = -1;
            replicationScriptCacheFlush();
            return C_OK;
            }
            return C_OK; /* unreached */
            }

            rewriteAppendOnlyFileBackground() 函数 fork 一个子进程,子进程调用 rewriteAppendOnlyFile() 函数执行重写过程。


              /* Write a sequence of commands able to fully rebuild the dataset into
              * "filename". Used both by REWRITEAOF and BGREWRITEAOF.
              *
              * In order to minimize the number of commands needed in the rewritten
              * log Redis uses variadic commands when possible, such as RPUSH, SADD
              * and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time
              * are inserted using a single command. */
              int rewriteAppendOnlyFile(char *filename) {
              rio aof;
              FILE *fp;
              char tmpfile[256];
              char byte;


              /* Note that we have to use a different temp name here compared to the
              * one used by rewriteAppendOnlyFileBackground() function. */
              snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
              fp = fopen(tmpfile,"w");
              if (!fp) {
              serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
              return C_ERR;
              }


              server.aof_child_diff = sdsempty();
              rioInitWithFile(&aof,fp);


              if (server.aof_rewrite_incremental_fsync)
              rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES);


              if (server.aof_use_rdb_preamble) {
              int error;
              if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) {
              errno = error;
              goto werr;
              }
              } else {
              if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
              }


              /* Do an initial slow fsync here while the parent is still sending
              * data, in order to make the next final fsync faster. */
              if (fflush(fp) == EOF) goto werr;
              if (fsync(fileno(fp)) == -1) goto werr;


              /* Read again a few times to get more data from the parent.
              * We can't read forever (the server may receive data from clients
              * faster than it is able to send data to the child), so we try to read
              * some more data in a loop as soon as there is a good chance more data
              * will come. If it looks like we are wasting time, we abort (this
              * happens after 20 ms without new data). */
              int nodata = 0;
              mstime_t start = mstime();
              while(mstime()-start < 1000 && nodata < 20) {
              if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0)
              {
              nodata++;
              continue;
              }
              nodata = 0; /* Start counting from zero, we stop on N *contiguous*
              timeouts. */
              aofReadDiffFromParent();
              }


              /* Ask the master to stop sending diffs. */
              if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;
              if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK)
              goto werr;
              /* We read the ACK from the server using a 10 seconds timeout. Normally
              * it should reply ASAP, but just in case we lose its reply, we are sure
              * the child will eventually get terminated. */
              if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||
              byte != '!') goto werr;
              serverLog(LL_NOTICE,"Parent agreed to stop sending diffs. Finalizing AOF...");


              /* Read the final diff if any. */
              aofReadDiffFromParent();


              /* Write the received diff to the file. */
              serverLog(LL_NOTICE,
              "Concatenating %.2f MB of AOF diff received from parent.",
              (double) sdslen(server.aof_child_diff) (1024*1024));
              if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0)
              goto werr;


              /* Make sure data will not remain on the OS's output buffers */
              if (fflush(fp) == EOF) goto werr;
              if (fsync(fileno(fp)) == -1) goto werr;
              if (fclose(fp) == EOF) goto werr;


              /* Use RENAME to make sure the DB file is changed atomically only
              * if the generate DB file is ok. */
              if (rename(tmpfile,filename) == -1) {
              serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
              unlink(tmpfile);
              return C_ERR;
              }
              serverLog(LL_NOTICE,"SYNC append only file rewrite performed");
              return C_OK;


              werr:
              serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
              fclose(fp);
              unlink(tmpfile);
              return C_ERR;
              }

              rewriteAppendOnlyFile() 函数打开临时文件 temp-rewriteaof-%d.aof,调用 rewriteAppendOnlyFileRio() 函数将内存数据转换成命令,写入到临时文件中,然后从父进程读取增量数据,写入到临时文件中,最后调用 rename() 函数将临时文件名修改为 temp-rewriteaof-bg-%d.aof。


                int rewriteAppendOnlyFileRio(rio *aof) {
                dictIterator *di = NULL;
                dictEntry *de;
                size_t processed = 0;
                int j;


                for (j = 0; j < server.dbnum; j++) {
                char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
                redisDb *db = server.db+j;
                dict *d = db->dict;
                if (dictSize(d) == 0) continue;
                di = dictGetSafeIterator(d);


                /* SELECT the new DB */
                if (rioWrite(aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
                if (rioWriteBulkLongLong(aof,j) == 0) goto werr;


                /* Iterate this DB writing every entry */
                while((de = dictNext(di)) != NULL) {
                sds keystr;
                robj key, *o;
                long long expiretime;


                keystr = dictGetKey(de);
                o = dictGetVal(de);
                initStaticStringObject(key,keystr);


                expiretime = getExpire(db,&key);


                /* Save the key and associated value */
                if (o->type == OBJ_STRING) {
                /* Emit a SET command */
                char cmd[]="*3\r\n$3\r\nSET\r\n";
                if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
                /* Key and value */
                if (rioWriteBulkObject(aof,&key) == 0) goto werr;
                if (rioWriteBulkObject(aof,o) == 0) goto werr;
                } else if (o->type == OBJ_LIST) {
                if (rewriteListObject(aof,&key,o) == 0) goto werr;
                } else if (o->type == OBJ_SET) {
                if (rewriteSetObject(aof,&key,o) == 0) goto werr;
                } else if (o->type == OBJ_ZSET) {
                if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr;
                } else if (o->type == OBJ_HASH) {
                if (rewriteHashObject(aof,&key,o) == 0) goto werr;
                } else if (o->type == OBJ_STREAM) {
                if (rewriteStreamObject(aof,&key,o) == 0) goto werr;
                } else if (o->type == OBJ_MODULE) {
                if (rewriteModuleObject(aof,&key,o) == 0) goto werr;
                } else {
                serverPanic("Unknown object type");
                }
                /* Save the expire time */
                if (expiretime != -1) {
                char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
                if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
                if (rioWriteBulkObject(aof,&key) == 0) goto werr;
                if (rioWriteBulkLongLong(aof,expiretime) == 0) goto werr;
                }
                /* Read some diff from the parent process from time to time. */
                if (aof->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) {
                processed = aof->processed_bytes;
                aofReadDiffFromParent();
                }
                }
                dictReleaseIterator(di);
                di = NULL;
                }
                return C_OK;


                werr:
                if (di) dictReleaseIterator(di);
                return C_ERR;
                }

                rewriteAppendOnlyFileRio() 函数将内存数据转换成命令,写入到文件中。其中:

                String 类型转换成 SET 命令;

                List 类型转换成 RPUSH 命令;

                Hash 类型转换成 HMSET 命令;

                Set 类型转换成 SADD 命令;

                Sorted Set 类型转换成 ZADD 命令。

                同时,调用 aofReadDiffFromParent() 函数从父进程读取增量数据。

                  /* This function is called by the child rewriting the AOF file to read
                  * the difference accumulated from the parent into a buffer, that is
                  * concatenated at the end of the rewrite. */
                  ssize_t aofReadDiffFromParent(void) {
                  char buf[65536]; /* Default pipe buffer size on most Linux systems. */
                  ssize_t nread, total = 0;


                  while ((nread =
                  read(server.aof_pipe_read_data_from_parent,buf,sizeof(buf))) > 0) {
                  server.aof_child_diff = sdscatlen(server.aof_child_diff,buf,nread);
                  total += nread;
                  }
                  return total;
                  }


                  子进程完成以后,退出。


                  接下来,父进程做最后的收尾工作:


                  在 serverCron() 函数中有如下代码:

                    /* Check if a background saving or AOF rewrite in progress terminated. */
                    if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
                    ldbPendingChildren())
                    {
                    int statloc;
                    pid_t pid;


                    if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
                    int exitcode = WEXITSTATUS(statloc);
                    int bysignal = 0;


                    if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);


                    if (pid == -1) {
                    serverLog(LL_WARNING,"wait3() returned an error: %s. "
                    "rdb_child_pid = %d, aof_child_pid = %d",
                    strerror(errno),
                    (int) server.rdb_child_pid,
                    (int) server.aof_child_pid);
                    } else if (pid == server.rdb_child_pid) {
                    backgroundSaveDoneHandler(exitcode,bysignal);
                    if (!bysignal && exitcode == 0) receiveChildInfo();
                    } else if (pid == server.aof_child_pid) {
                    backgroundRewriteDoneHandler(exitcode,bysignal);
                    if (!bysignal && exitcode == 0) receiveChildInfo();
                    } else {
                    if (!ldbRemoveChild(pid)) {
                    serverLog(LL_WARNING,
                    "Warning, detected child with unmatched pid: %ld",
                    (long)pid);
                    }
                    }
                    updateDictResizePolicy();
                    closeChildInfoPipe();
                    }
                    }

                    子进程退出以后,退出状态被父进程 wait3() 函数捕获,接着调用 backgroundRewriteDoneHandler() 函数进行处理。

                      /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
                      * Handle this. */
                      void backgroundRewriteDoneHandler(int exitcode, int bysignal)


                      /* Flush the differences accumulated by the parent to the
                       * rewritten AOF. */
                      latencyStartMonitor(latency);
                      snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
                          (int)server.aof_child_pid);
                      newfd = open(tmpfile,O_WRONLY|O_APPEND);
                      if (newfd == -1) {
                          serverLog(LL_WARNING,
                              "Unable to open the temporary AOF produced by the child: %s", strerror(errno));
                          goto cleanup;
                      }


                      if (aofRewriteBufferWrite(newfd) == -1) {
                          serverLog(LL_WARNING,
                              "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
                          close(newfd);
                          goto cleanup;
                      }
                      latencyEndMonitor(latency);
                      latencyAddSampleIfNeeded("aof-rewrite-diff-write",latency);


                      serverLog(LL_NOTICE,
                          "Residual parent diff successfully flushed to the rewritten AOF (%.2f MB)", (double) aofRewriteBufferSize() / (1024*1024));


                      /* The only remaining thing to do is to rename the temporary file to
                       * the configured file and switch the file descriptor used to do AOF
                       * writes. We don't want close(2) or rename(2) calls to block the
                       * server on old file deletion.
                       *
                       * There are two possible scenarios:
                       *
                       * 1) AOF is DISABLED and this was a one time rewrite. The temporary
                       * file will be renamed to the configured file. When this file already
                       * exists, it will be unlinked, which may block the server.
                       *
                       * 2) AOF is ENABLED and the rewritten AOF will immediately start
                       * receiving writes. After the temporary file is renamed to the
                       * configured file, the original AOF file descriptor will be closed.
                       * Since this will be the last reference to that file, closing it
                       * causes the underlying file to be unlinked, which may block the
                       * server.
                       *
                       * To mitigate the blocking effect of the unlink operation (either
                       * caused by rename(2) in scenario 1, or by close(2) in scenario 2), we
                       * use a background thread to take care of this. First, we
                       * make scenario 1 identical to scenario 2 by opening the target file
                       * when it exists. The unlink operation after the rename(2) will then
                       * be executed upon calling close(2) for its descriptor. Everything to
                       * guarantee atomicity for this switch has already happened by then, so
                       * we don't care what the outcome or duration of that close operation
                       * is, as long as the file descriptor is released again. */
                       if (server.aof_fd == -1) {
                          /* AOF disabled */


                          /* Don't care if this fails: oldfd will be -1 and we handle that.
                           * One notable case of -1 return is if the old file does
                           * not exist. */
                          oldfd = open(server.aof_filename,O_RDONLY|O_NONBLOCK);
                      else {
                          /* AOF enabled */
                          oldfd = -1/* We'll set this to the current AOF filedes later. */
                      }


                      /* Rename the temporary file. This will not unlink the target file if
                       * it exists, because we reference it with "oldfd". */
                      latencyStartMonitor(latency);
                      if (rename(tmpfile,server.aof_filename) == -1) {
                          serverLog(LL_WARNING,
                              "Error trying to rename the temporary AOF file %s into %s: %s",
                              tmpfile,
                              server.aof_filename,
                              strerror(errno));
                          close(newfd);
                          if (oldfd != -1) close(oldfd);
                          goto cleanup;
                      }
                      latencyEndMonitor(latency);
                      latencyAddSampleIfNeeded("aof-rename",latency);

                      backgroundRewriteDoneHandler() 函数打开临时文件 temp-rewriteaof-bg-%d.aof,调用 aofRewriteBufferWrite() 函数将缓存中的增量数据写入到临时文件中,最后调用 rename() 函数将临时文件名修改为 server.aof_filename。

                      注意:这里的临时文件和子进程最终保存的临时文件是同一个文件(即:temp-rewriteaof-bg-%d.aof)。

                        /* Write the buffer (possibly composed of multiple blocks) into the specified
                        * fd. If a short write or any other error happens -1 is returned,
                        * otherwise the number of bytes written is returned. */
                        ssize_t aofRewriteBufferWrite(int fd) {
                        listNode *ln;
                        listIter li;
                        ssize_t count = 0;


                        listRewind(server.aof_rewrite_buf_blocks,&li);
                        while((ln = listNext(&li))) {
                        aofrwblock *block = listNodeValue(ln);
                        ssize_t nwritten;


                        if (block->used) {
                        nwritten = write(fd,block->buf,block->used);
                        if (nwritten != (ssize_t)block->used) {
                        if (nwritten == 0) errno = EIO;
                        return -1;
                        }
                        count += nwritten;
                        }
                        }
                        return count;
                        }


                        backgroundRewriteDoneHandler() 函数执行完成以后,bgrewriteaof 命令就结束了。




                        前面我们一直提到增量数据,那么增量数据是哪里来的呢?


                        redis 命令的执行,最终都是通过调用 call() 函数来实现的。

                          void call(client *c, int flags)


                          /* Propagate the command into the AOF and replication link */
                          if (flags & CMD_CALL_PROPAGATE &&
                          (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
                          {
                          int propagate_flags = PROPAGATE_NONE;


                          /* Check if the command operated changes in the data set. If so
                          * set for replication AOF propagation. */
                          if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);


                          /* If the client forced AOF replication of the command, set
                          * the flags regardless of the command effects on the data set. */
                          if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
                          if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;


                          /* However prevent AOF replication propagation if the command
                          * implementations called preventCommandPropagation() or similar,
                          * or if we don't have the call() flags to do so. */
                          if (c->flags & CLIENT_PREVENT_REPL_PROP ||
                          !(flags & CMD_CALL_PROPAGATE_REPL))
                          propagate_flags &= ~PROPAGATE_REPL;
                          if (c->flags & CLIENT_PREVENT_AOF_PROP ||
                          !(flags & CMD_CALL_PROPAGATE_AOF))
                          propagate_flags &= ~PROPAGATE_AOF;


                          /* Call propagate() only if at least one of AOF replication
                          * propagation is needed. Note that modules commands handle replication
                          * in an explicit way, so we never replicate them automatically. */
                          if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
                          propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
                          }

                          每执行一条命令,call() 函数就调用一次 propagate() 函数。

                            /* Propagate the specified command (in the context of the specified database id)
                            * to AOF and Slaves.
                            *
                            * flags are an xor between:
                            * + PROPAGATE_NONE (no propagation of command at all)
                            * + PROPAGATE_AOF (propagate into the AOF file if is enabled)
                            * + PROPAGATE_REPL (propagate into the replication link)
                            *
                            * This should not be used inside commands implementation. Use instead
                            * alsoPropagate(), preventCommandPropagation(), forceCommandPropagation().
                            */
                            void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
                            int flags)
                            {
                            if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
                            feedAppendOnlyFile(cmd,dbid,argv,argc);
                            if (flags & PROPAGATE_REPL)
                            replicationFeedSlaves(server.slaves,dbid,argv,argc);
                            }
                              void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
                              sds buf = sdsempty();
                              robj *tmpargv[3];


                              /* The DB this command was targeting is not the same as the last command
                              * we appended. To issue a SELECT command is needed. */
                              if (dictid != server.aof_selected_db) {
                              char seldb[64];


                              snprintf(seldb,sizeof(seldb),"%d",dictid);
                              buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
                              (unsigned long)strlen(seldb),seldb);
                              server.aof_selected_db = dictid;
                              }


                              if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
                              cmd->proc == expireatCommand) {
                              /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
                              buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
                              } else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
                              /* Translate SETEX/PSETEX to SET and PEXPIREAT */
                              tmpargv[0] = createStringObject("SET",3);
                              tmpargv[1] = argv[1];
                              tmpargv[2] = argv[3];
                              buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
                              decrRefCount(tmpargv[0]);
                              buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
                              } else if (cmd->proc == setCommand && argc > 3) {
                              int i;
                              robj *exarg = NULL, *pxarg = NULL;
                              /* Translate SET [EX seconds][PX milliseconds] to SET and PEXPIREAT */
                              buf = catAppendOnlyGenericCommand(buf,3,argv);
                              for (i = 3; i < argc; i ++) {
                              if (!strcasecmp(argv[i]->ptr, "ex")) exarg = argv[i+1];
                              if (!strcasecmp(argv[i]->ptr, "px")) pxarg = argv[i+1];
                              }
                              serverAssert(!(exarg && pxarg));
                              if (exarg)
                              buf = catAppendOnlyExpireAtCommand(buf,server.expireCommand,argv[1],
                              exarg);
                              if (pxarg)
                              buf = catAppendOnlyExpireAtCommand(buf,server.pexpireCommand,argv[1],
                              pxarg);
                              } else {
                              /* All the other commands don't need translation or need the
                              * same translation already operated in the command vector
                              * for the replication itself. */
                              buf = catAppendOnlyGenericCommand(buf,argc,argv);
                              }


                              /* Append to the AOF buffer. This will be flushed on disk just before
                              * of re-entering the event loop, so before the client will get a
                              * positive reply about the operation performed. */
                              if (server.aof_state == AOF_ON)
                              server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));


                              /* If a background append only file rewriting is in progress we want to
                              * accumulate the differences between the child DB and the current one
                              * in a buffer, so that when the child process will do its work we
                              * can append the differences to the new append only file. */
                              if (server.aof_child_pid != -1)
                              aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));


                              sdsfree(buf);
                              }

                              如果我们开启了 AOF,则 feedAppendOnlyFile() 函数将命令写入到 server.aof_buf 缓存中。

                              如果此时,我们正在进行 AOF 重写,则 feedAppendOnlyFile() 函数调用 aofRewriteBufferAppend() 函数将命令写入到 server.aof_rewrite_buf_blocks 缓存中,server.aof_rewrite_buf_blocks 缓存中的数据即是增量数据

                                /* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
                                void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
                                listNode *ln = listLast(server.aof_rewrite_buf_blocks);
                                aofrwblock *block = ln ? ln->value : NULL;


                                while(len) {
                                /* If we already got at least an allocated block, try appending
                                * at least some piece into it. */
                                if (block) {
                                unsigned long thislen = (block->free < len) ? block->free : len;
                                if (thislen) { /* The current block is not already full. */
                                memcpy(block->buf+block->used, s, thislen);
                                block->used += thislen;
                                block->free -= thislen;
                                s += thislen;
                                len -= thislen;
                                }
                                }


                                if (len) { /* First block to allocate, or need another block. */
                                int numblocks;


                                block = zmalloc(sizeof(*block));
                                block->free = AOF_RW_BUF_BLOCK_SIZE;
                                block->used = 0;
                                listAddNodeTail(server.aof_rewrite_buf_blocks,block);


                                /* Log every time we cross more 10 or 100 blocks, respectively
                                * as a notice or warning. */
                                numblocks = listLength(server.aof_rewrite_buf_blocks);
                                if (((numblocks+1) % 10) == 0) {
                                int level = ((numblocks+1) % 100) == 0 ? LL_WARNING :
                                LL_NOTICE;
                                serverLog(level,"Background AOF buffer size: %lu MB",
                                aofRewriteBufferSize()/(1024*1024));
                                }
                                }
                                }


                                /* Install a file event to send data to the rewrite child if there is
                                * not one already. */
                                if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) {
                                aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
                                AE_WRITABLE, aofChildWriteDiffData, NULL);
                                }
                                }

                                aofRewriteBufferAppend() 函数将命令写入到 server.aof_rewrite_buf_blocks 缓存中,然后创建一个文件事件,事件处理函数为:aofChildWriteDiffData()。

                                  /* Event handler used to send data to the child process doing the AOF
                                  * rewrite. We send pieces of our AOF differences buffer so that the final
                                  * write when the child finishes the rewrite will be small. */
                                  void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
                                  listNode *ln;
                                  aofrwblock *block;
                                  ssize_t nwritten;
                                  UNUSED(el);
                                  UNUSED(fd);
                                  UNUSED(privdata);
                                  UNUSED(mask);


                                  while(1) {
                                  ln = listFirst(server.aof_rewrite_buf_blocks);
                                  block = ln ? ln->value : NULL;
                                  if (server.aof_stop_sending_diff || !block) {
                                  aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,
                                  AE_WRITABLE);
                                  return;
                                  }
                                  if (block->used > 0) {
                                  nwritten = write(server.aof_pipe_write_data_to_child,
                                  block->buf,block->used);
                                  if (nwritten <= 0) return;
                                  memmove(block->buf,block->buf+nwritten,block->used-nwritten);
                                  block->used -= nwritten;
                                  block->free += nwritten;
                                  }
                                  if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln);
                                  }
                                  }

                                  aofChildWriteDiffData() 函数将 server.aof_rewrite_buf_blocks 缓存中的数据发送给子进程。


                                  细节补充

                                  前面我们介绍了 bgrewriteaof 命令的执行过程,但还有一些细节补充如下:

                                    /* Create the pipes used for parent - child process IPC during rewrite.
                                    * We have a data pipe used to send AOF incremental diffs to the child,
                                    * and two other pipes used by the children to signal it finished with
                                    * the rewrite so no more data should be written, and another for the
                                    * parent to acknowledge it understood this new condition. */
                                    int aofCreatePipes(void) {
                                    int fds[6] = {-1, -1, -1, -1, -1, -1};
                                    int j;


                                    if (pipe(fds) == -1) goto error; /* parent -> children data. */
                                    if (pipe(fds+2) == -1) goto error; /* children -> parent ack. */
                                    if (pipe(fds+4) == -1) goto error; /* parent -> children ack. */
                                    /* Parent -> children data is non blocking. */
                                    if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
                                    if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;
                                    if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;


                                    server.aof_pipe_write_data_to_child = fds[1];
                                    server.aof_pipe_read_data_from_parent = fds[0];
                                    server.aof_pipe_write_ack_to_parent = fds[3];
                                    server.aof_pipe_read_ack_from_child = fds[2];
                                    server.aof_pipe_write_ack_to_child = fds[5];
                                    server.aof_pipe_read_ack_from_parent = fds[4];
                                    server.aof_stop_sending_diff = 0;
                                    return C_OK;


                                    error:
                                    serverLog(LL_WARNING,"Error opening /setting AOF rewrite IPC pipes: %s",
                                    strerror(errno));
                                    for (j = 0; j < 6; j++) if(fds[j] != -1) close(fds[j]);
                                    return C_ERR;
                                    }

                                    在主进程 fork 子进程之前,rewriteAppendOnlyFileBackground() 函数调用 aofCreatePipes() 函数创建 3 个管道用于主进程与子进程之间通信。同时,在其中一个管道上创建一个文件事件,事件处理函数为:aofChildPipeReadable()。

                                      /* This event handler is called when the AOF rewriting child sends us a
                                      * single '!' char to signal we should stop sending buffer diffs. The
                                      * parent sends a '!' as well to acknowledge. */
                                      void aofChildPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) {
                                      char byte;
                                      UNUSED(el);
                                      UNUSED(privdata);
                                      UNUSED(mask);


                                      if (read(fd,&byte,1) == 1 && byte == '!') {
                                      serverLog(LL_NOTICE,"AOF rewrite child asks to stop sending diffs.");
                                      server.aof_stop_sending_diff = 1;
                                      if (write(server.aof_pipe_write_ack_to_child,"!",1) != 1) {
                                      /* If we can't send the ack, inform the user, but don't try again
                                      * since in the other side the children will use a timeout if the
                                      * kernel can't buffer our write, or, the children was
                                      * terminated. */
                                      serverLog(LL_WARNING,"Can't send ACK to AOF child: %s",
                                      strerror(errno));
                                      }
                                      }
                                      /* Remove the handler since this can be called only one time during a
                                      * rewrite. */
                                      aeDeleteFileEvent(server.el,server.aof_pipe_read_ack_from_child,AE_READABLE);
                                      }

                                      当主进程收到子进程发送的 !,则主进程停止向子进程发送增量数据,同时发送 ! 作为确认。


                                      文章转载自谷竹,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                      评论