作者:朱鹏举
新人 DBA ,会点 MySQL ,Redis ,Oracle ,在知识的海洋中挣扎,活下来就算成功...
本文来源:原创投稿
* 爱可生开源社区出品,原创内容未经授权不得随意使用,转载请联系小编并注明来源。
什么是 AOF 重写
简单来说,AOF 重写就是根据当时键值对的最新状态,为它生成对应的写入命令,然后写入到临时 AOF 日志中。在重写期间 Redis 会将发生更改的数据写入到重写缓冲区 aof_rewrite_buf_blocks 中,于重写结束后合并到临时 AOF 日志中,最后使用临时 AOF 日志替换原来的 AOF 日志。当然,为了避免阻塞主线程,Redis 会 fork 一个进程来执行 AOF 重写操作。
如何定义 AOF 重写缓冲区
我知道你很急,但是你先别急,在了解AOF重写流程之前你会先遇到第一个问题,那就是如何定义AOF重写缓冲区。
一般来说我们会想到用malloc函数来初始化一块内存用于保存AOF重写期间主进程收到的命令,当剩余空间不足时再用realloc函数对其进行扩容。但是Redis并没有这么做,Redis定义了一个aofrwblock结构体,其中包含了一个10MB大小的字符数组,当做一个数据块,负责记录AOF重写期间主进程收到的命令,然后使用aof_rewrite_buf_blocks列表将这些数据块连接起来,每次分配一个aofrwblock数据块。
//AOF重写缓冲区大小为10MB,每一次分配一个aofrwblock
typedef struct aofrwblock {
unsigned long used, free;
char buf[AOF_RW_BUF_BLOCK_SIZE]; //10MB
} aofrwblock;
AOF 重写流程
int rewriteAppendOnlyFileBackground(void) {
pid_t childpid;
long long start;
if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
if (aofCreatePipes() != C_OK) return C_ERR;
openChildInfoPipe();
start = ustime();
if ((childpid = fork()) == 0) {
char tmpfile[256];
/* Child */
closeListeningSockets(0);
redisSetProcTitle("redis-aof-rewrite");
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
size_t private_dirty = zmalloc_get_private_dirty(-1);
if (private_dirty) {
serverLog(LL_NOTICE,
"AOF rewrite: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
server.child_info_data.cow_size = private_dirty;
sendChildInfo(CHILD_INFO_TYPE_AOF);
exitFromChild(0);
} else {
exitFromChild(1);
}
} else {
/* Parent */
server.stat_fork_time = ustime()-start;
/* GB per second. */
server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024);
latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
if (childpid == -1) {
closeChildInfoPipe();
serverLog(LL_WARNING,
"Can't rewrite append only file in background: fork: %s",
strerror(errno));
aofClosePipes();
return C_ERR;
}
serverLog(LL_NOTICE,
"Background append only file rewriting started by pid %d",childpid);
server.aof_rewrite_scheduled = 0;
server.aof_rewrite_time_start = time(NULL);
server.aof_child_pid = childpid;
updateDictResizePolicy();
server.aof_selected_db = -1;
replicationScriptCacheFlush();
return C_OK;
}
return C_OK; /* unreached */
}
父进程
若当前有正在进行的 AOF 重写子进程或者 RDB 持久化子进程,则退出 AOF 重写流程 创建3个管道
parent -> children data children -> parent ack parent -> children ack
将 parent -> children data 设置为非阻塞 在 children -> parent ack 上注册读事件的监听 将数组 fds 中的六个⽂件描述符分别复制给 server 变量的成员变量 打开 children->parent ack 通道,用于将 RDB/AOF 保存过程的信息发送给父进程 用 start 变量记录当前时间 fork 出一个子进程,通过写时复制的形式共享主线程的所有内存数据
子进程
关闭监听 socket ,避免接收客户端连接
设置进程名
生成 AOF 临时文件名
遍历每个数据库的每个键值对,以插入(命令+键值对)的方式写到临时 AOF ⽂件中
父进程
计算上一次 fork 已经花费的时间 计算每秒写了多少 GB 内容 判断上一次 fork 是否结束,没结束则此次 AOF 重写流程就此中止 将 aof_rewrite_scheduled 设置为0(表示现在没有待调度执⾏的 AOF 重写操作) 关闭 rehash 功能(Rehash 会带来较多的数据移动操作,这就意味着⽗进程中的内存修改会⽐较多,对于 AOF 重写⼦进程来说,就需要更多的时间来执行写时复制,进⽽完成 AOF ⽂件的写⼊,这就会给 Redis 系统的性能造成负⾯影响) 将 aof_selected_db 设置为-1(以强制在下一次调用 feedAppendOnlyFile 函数(写 AOF 日志)的时候将 AOF 重写期间累计的内容合并到 AOF 日志中) 当发现正在进行 AOF 重写任务的时候 (1)将收到的新的写命令缓存在 aofrwblock 中 (2)检查 parent -> children data 上面有没有写监听,没有的话注册一个 (3)触发写监听时从 aof_rewrite_buf_blocks 列表中逐个取出 aofrwblock 数据块,通过 parent -> children data 发送到 AOF 重写子进程 子进程重写结束后,将重写期间 aof_rewrite_buf_blocks 列表中没有消费完成的数据追加写入到临时 AOF 文件中
管道机制
1. 子进程从 parent -> children data 读取数据 (触发时机)
rewriteAppendOnlyFileRio 由重写⼦进程执⾏,负责遍历 Redis 每个数据库,⽣成 AOF 重写⽇志,在这个过程中,会不时地调⽤ aofReadDiffFromParent
rewriteAppendOnlyFile 重写⽇志的主体函数,也是由重写⼦进程执⾏的,本⾝会调⽤rewriteAppendOnlyFileRio,调⽤完后会调⽤ aofReadDiffFromParent 多次,尽可能多地读取主进程在重写⽇志期间收到的操作命令 rdbSaveRio
创建 RDB ⽂件的主体函数,使⽤ AOF 和 RDB 混合持久化机制时,这个函数会调⽤aofReadDiffFromParent
//将从父级累积的差异读取到缓冲区中,该缓冲区在重写结束时连接
ssize_t aofReadDiffFromParent(void) {
char buf[65536]; //大多数Linux系统上的默认管道缓冲区大小
ssize_t nread, total = 0;
while ((nread =
read(server.aof_pipe_read_data_from_parent,buf,sizeof(buf))) > 0) {
server.aof_child_diff = sdscatlen(server.aof_child_diff,buf,nread);
total += nread;
}
return total;
}
2. 子进程向 children -> parent ack 发送 ACK信号
在完成⽇志重写,以及多次向⽗进程读取操作命令后,向 children -> parent ack 发送"!",也就是向主进程发送 ACK 信号,让主进程停⽌发送收到的新写操作
int rewriteAppendOnlyFile(char *filename) {
rio aof;
FILE *fp;
char tmpfile[256];
char byte;
//注意,与rewriteAppendOnlyFileBackground()函数使用的临时名称相比,我们必须在此处使用不同的临时名称
snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
return C_ERR;
}
server.aof_child_diff = sdsempty();
rioInitWithFile(&aof,fp);
if (server.aof_rewrite_incremental_fsync)
rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES);
if (server.aof_use_rdb_preamble) {
int error;
if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) {
errno = error;
goto werr;
}
} else {
if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
}
//当父进程仍在发送数据时,在此处执行初始的慢速fsync,以便使下一个最终的fsync更快
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
//再读几次,从父级获取更多数据。我们不能永远读取(服务器从客户端接收数据的速度可能快于它向子进程发送数据的速度),所以我们尝试在循环中读取更多的数据,只要有更多的数据出现。如果看起来我们在浪费时间,我们会中止(在没有新数据的情况下,这会在20ms后发生)。
int nodata = 0;
mstime_t start = mstime();
while(mstime()-start < 1000 && nodata < 20) {
if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0)
{
nodata++;
continue;
}
nodata = 0; /* Start counting from zero, we stop on N *contiguous*
timeouts. */
aofReadDiffFromParent();
}
//发送ACK信息让父进程停止发送消息
if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;
if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK)
goto werr;
//等待父进程返回的ACK信息,超时时间为10秒。通常父进程应该尽快回复,但万一失去回复,则确信子进程最终会被终止。
if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||
byte != '!') goto werr;
serverLog(LL_NOTICE,"Parent agreed to stop sending diffs. Finalizing AOF...");
//如果存在最终差异数据,那么将读取
aofReadDiffFromParent();
//将收到的差异数据写入文件
serverLog(LL_NOTICE,
"Concatenating %.2f MB of AOF diff received from parent.",
(double) sdslen(server.aof_child_diff) / (1024*1024));
if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0)
goto werr;
//确保数据不会保留在操作系统的输出缓冲区中
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
if (fclose(fp) == EOF) goto werr;
//使用RENAME确保仅当生成DB文件正常时,才自动更改DB文件
if (rename(tmpfile,filename) == -1) {
serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
unlink(tmpfile);
return C_ERR;
}
serverLog(LL_NOTICE,"SYNC append only file rewrite performed");
return C_OK;
werr:
serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
fclose(fp);
unlink(tmpfile);
return C_ERR;
}
当 children -> parent ack 上有了数据,就会触发之前注册的读监听
判断这个数据是不是"!"
是就向 parent -> children ack 写入"!",表⽰主进程已经收到重写⼦进程发送的 ACK 信息,同时给重写⼦进程回复⼀个 ACK 信息
void aofChildPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) {
char byte;
UNUSED(el);
UNUSED(privdata);
UNUSED(mask);
if (read(fd,&byte,1) == 1 && byte == '!') {
serverLog(LL_NOTICE,"AOF rewrite child asks to stop sending diffs.");
server.aof_stop_sending_diff = 1;
if (write(server.aof_pipe_write_ack_to_child,"!",1) != 1) {
//如果我们无法发送ack,请通知用户,但不要重试,因为在另一侧,如果内核无法缓冲我们的写入,或者子级已终止,则子级将使用超时
serverLog(LL_WARNING,"Can't send ACK to AOF child: %s",
strerror(errno));
}
}
//删除处理程序,因为在重写期间只能调用一次
aeDeleteFileEvent(server.el,server.aof_pipe_read_ack_from_child,AE_READABLE);
}
1. 手动触发
当前没有正在执⾏ AOF 重写的⼦进程
当前没有正在执⾏创建 RDB 的⼦进程,有会将 aof_rewrite_scheduled 设置为1(AOF 重写操作被设置为了待调度执⾏)
void bgrewriteaofCommand(client *c) {
if (server.aof_child_pid != -1) {
addReplyError(c,"Background append only file rewriting already in progress");
} else if (server.rdb_child_pid != -1) {
server.aof_rewrite_scheduled = 1;
addReplyStatus(c,"Background append only file rewriting scheduled");
} else if (rewriteAppendOnlyFileBackground() == C_OK) {
addReplyStatus(c,"Background append only file rewriting started");
} else {
addReply(c,shared.err);
}
}
2. 开启 AOF 与主从复制
开启 AOF 功能以后,执行一次 AOF 重写
主从节点在进⾏复制时,如果从节点的 AOF 选项被打开,那么在加载解析 RDB ⽂件时,AOF 选项会被关闭,⽆论从节点是否成功加载 RDB ⽂件,restartAOFAfterSYNC 函数都会被调⽤,⽤来恢复被关闭的 AOF 功能,在这个过程中会执行一次 AOF 重写
int startAppendOnly(void) {
char cwd[MAXPATHLEN]; //错误消息的当前工作目录路径
int newfd;
newfd = open(server.aof_filename,O_WRONLY|O_APPEND|O_CREAT,0644);
serverAssert(server.aof_state == AOF_OFF);
if (newfd == -1) {
char *cwdp = getcwd(cwd,MAXPATHLEN);
serverLog(LL_WARNING,
"Redis needs to enable the AOF but can't open the "
"append only file %s (in server root dir %s): %s",
server.aof_filename,
cwdp ? cwdp : "unknown",
strerror(errno));
return C_ERR;
}
if (server.rdb_child_pid != -1) {
server.aof_rewrite_scheduled = 1;
serverLog(LL_WARNING,"AOF was enabled but there is already a child process saving an RDB file on disk. An AOF background was scheduled to start when possible.");
} else {
//关闭正在进行的AOF重写进程,并启动一个新的AOF:旧的AOF无法重用,因为它没有累积AOF缓冲区。
if (server.aof_child_pid != -1) {
serverLog(LL_WARNING,"AOF was enabled but there is already an AOF rewriting in background. Stopping background AOF and starting a rewrite now.");
killAppendOnlyChild();
}
if (rewriteAppendOnlyFileBackground() == C_ERR) {
close(newfd);
serverLog(LL_WARNING,"Redis needs to enable the AOF but can't trigger a background AOF rewrite operation. Check the above logs for more info about the error.");
return C_ERR;
}
}
//我们正确地打开了AOF,现在等待重写完成,以便将数据附加到磁盘上
server.aof_state = AOF_WAIT_REWRITE;
server.aof_last_fsync = server.unixtime;
server.aof_fd = newfd;
return C_OK;
}
3. 定时任务
每100毫秒触发一次,由 server.hz 控制,默认10 当前没有在执⾏的RDB⼦进程 && AOF重写⼦进程 && aof_rewrite_scheduled=1 当前没有在执⾏的RDB⼦进程 && AOF重写⼦进程 && aof_rewrite_scheduled=0 AOF功能已启⽤ && AOF⽂件⼤⼩⽐例超出auto-aof-rewrite-percentage && AOF⽂件⼤⼩绝对值超出auto-aofrewrite-min-size
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
......
//判断当前没有在执⾏的RDB⼦进程 && AOF重写⼦进程 && aof_rewrite_scheduled=1
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
server.aof_rewrite_scheduled)
{
rewriteAppendOnlyFileBackground();
}
//检查正在进行的后台保存或AOF重写是否终止
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
ldbPendingChildren())
{
int statloc;
pid_t pid;
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
int exitcode = WEXITSTATUS(statloc);
int bysignal = 0;
if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
if (pid == -1) {
serverLog(LL_WARNING,"wait3() returned an error: %s. "
"rdb_child_pid = %d, aof_child_pid = %d",
strerror(errno),
(int) server.rdb_child_pid,
(int) server.aof_child_pid);
} else if (pid == server.rdb_child_pid) {
backgroundSaveDoneHandler(exitcode,bysignal);
if (!bysignal && exitcode == 0) receiveChildInfo();
} else if (pid == server.aof_child_pid) {
backgroundRewriteDoneHandler(exitcode,bysignal);
if (!bysignal && exitcode == 0) receiveChildInfo();
} else {
if (!ldbRemoveChild(pid)) {
serverLog(LL_WARNING,
"Warning, detected child with unmatched pid: %ld",
(long)pid);
}
}
updateDictResizePolicy();
closeChildInfoPipe();
}
} else {
//如果没有正在进行的后台save/rewrite,请检查是否必须立即save/rewrite
for (j = 0; j < server.saveparamslen; j++) {
struct saveparam *sp = server.saveparams+j;
//如果我们达到了给定的更改量、给定的秒数,并且最新的bgsave成功,或者如果发生错误,至少已经过了CONFIG_bgsave_RETRY_DELAY秒,则保存。
if (server.dirty >= sp->changes &&
server.unixtime-server.lastsave > sp->seconds &&
(server.unixtime-server.lastbgsave_try >
CONFIG_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == C_OK))
{
serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
sp->changes, (int)sp->seconds);
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
rdbSaveBackground(server.rdb_filename,rsiptr);
break;
}
}
//判断AOF功能已启⽤ && AOF⽂件⼤⼩⽐例超出auto-aof-rewrite-percentage && AOF⽂件⼤⼩绝对值超出auto-aof-rewrite-min-size
if (server.aof_state == AOF_ON &&
server.rdb_child_pid == -1 &&
server.aof_child_pid == -1 &&
server.aof_rewrite_perc &&
server.aof_current_size > server.aof_rewrite_min_size)
{
long long base = server.aof_rewrite_base_size ?
server.aof_rewrite_base_size : 1;
long long growth = (server.aof_current_size*100/base) - 100;
if (growth >= server.aof_rewrite_perc) {
serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
rewriteAppendOnlyFileBackground();
}
}
}
......
return 1000/server.hz;
}
AOF 重写功能的缺点
哪怕是你心中的她,也并非是完美无缺的存在,更别说 Redis 这个人工产物了。但不去发现也就自然而然不存在缺点,对吧~
1. 内存开销
在 AOF 重写期间,主进程会将 fork 之后的数据变化写进 aof_rewrite_buf 与 aof_buf 中,其内容绝大部分是重复的,在高流量写入的场景下两者消耗的空间几乎一样大。 AOF 重写带来的内存开销有可能导致 Redis 内存突然达到 maxmemory 限制,甚至会触发操作系统限制被 OOM Killer 杀死,导致 Redis 不可服务。
2. CPU 开销
在 AOF 重写期间主进程需要花费 CPU 时间向 aof_rewrite_buf 写数据,并使用 eventloop 事件循环向子进程发送 aof_rewrite_buf 中的数据。
//将数据附加到AOF重写缓冲区,如果需要,分配新的块
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
......
//创建事件以便向子进程发送数据
if (!server.aof_stop_sending_diff &&
aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0)
{
aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
AE_WRITABLE, aofChildWriteDiffData, NULL);
}
......
}
在子进程执行 AOF 重写操作的后期,会循环读取 pipe 中主进程发送来的增量数据,然后追加写入到临时 AOF 文件。
int rewriteAppendOnlyFile(char *filename) {
......
//再次读取几次以从父进程获取更多数据。我们不能永远读取(服务器从客户端接收数据的速度可能快于它向子级发送数据的速度),因此我们尝试在循环中读取更多数据,只要有很好的机会会有更多数据。如果看起来我们在浪费时间,我们会中止(在没有新数据的情况下,这会在20ms后发生)
int nodata = 0;
mstime_t start = mstime();
while(mstime()-start < 1000 && nodata < 20) {
if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0)
{
nodata++;
continue;
}
nodata = 0; /* Start counting from zero, we stop on N *contiguous*
timeouts. */
aofReadDiffFromParent();
}
......
}
在子进程完成 AOF 重写操作后,主进程会在 backgroundRewriteDoneHandler 中进行收尾工作,其中一个任务就是将在重 写期间 aof_rewrite_buf 中没有消费完成的数据写入临时 AOF 文件,消耗的 CPU 时间与 aof_rewrite_buf 中遗留的数据量成正比。
3. 磁盘IO开销
4. Fork
为了避免一次性拷贝大量内存数据给子进程造成的长时间阻塞问题,fork 采用操作系统提供的写时复制(Copy-On-Write)机制,但 fork 子进程需要拷贝进程必要的数据结构,其中有一项就是拷贝内存页表(虚拟内存和物理内存的映射索引表)。这个拷贝过程会消耗大量 CPU 资源,拷贝完成之前整个进程是会阻塞的,阻塞时间取决于整个实例的内存大小,实例越大,内存页表越大,fork 阻塞时间越久。拷贝内存页表完成后,子进程与父进程指向相同的内存地址空间,也就是说此时虽然产生了子进程,但是并没有申请与父进程相同的内存大小。
4. Redis 5.0.8源码:https://github.com/redis/redis/tree/5.0
爱可生开源社区的 SQLE 是一款面向数据库使用者和管理者,支持多场景审核,支持标准化上线流程,原生支持 MySQL 审核且数据库类型可扩展的 SQL 审核工具。
类型 | 地址 |
---|---|
版本库 | https://github.com/actiontech/sqle |
文档 | https://actiontech.github.io/sqle-docs-cn/ |
发布信息 | https://github.com/actiontech/sqle/releases |
数据审核插件开发文档 | https://actiontech.github.io/sqle-docs-cn/3.modules/3.7_auditplugin/auditplugin_development.html |
更多关于 SQLE 的信息和交流,请加入官方QQ交流群:637150065...