数据同步
Slave实例向Master实例申请数据同步
Slave实例在syncWithMaster
函数中完成建立与Master实例的连接之后,便会将redisServer.repl_state
这个状态字段切换至REPL_STATE_SEND_PSYNC
这个状态,表示Slave实例将要向Master发起数据同步请求。
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {...if (server.repl_state == REPL_STATE_SEND_PSYNC){if (slaveTryPartialResynchronization(fd, 0) == PSYNC_WRITE_ERROR){goto wirte_error;}server.repl_state = REPL_STATE_RECEIVE_PSYNC;return;}if (server.repl_state != REPL_STATE_RECEIVE_PSYNC){goto error;}psync_result = slaveTryPartialResynchronization(fd,1);...}
在上面这段代码片段之中,syncWithMaster
会分两次调用slaveTryPartialResynchronization
接口向Master尝试请求数据同步:
第一次调用会向Master发起PSYNC命令,用以通报自身的复制信息。如果这个Slave是在连接断开之后重新连接的情况,则会将
redisServer.cached_master
之中缓存的同步信息包括replid
以及offset
通报给Master,由Master判断是进行增量数据同步还是全量的数据同步。int slaveTryPartialResynchronization(int fd, int read_reply){//发送命令if (!read_reply){return PSYNC_WAIT_REPLY;}...}第二次调用则是接收从Master返回的PSYNC命令的结果:
int slaveTryPartialResynchronization(int fd, int read_reply){...// 读取命令返回reply = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);aeDeleteFileEvent(server.el,fd,AE_READABLE);if (!strncmp(reply,"+FULLRESYNC",11)){//解析replid以及offset...replicationDiscardCachedMaster();return PSYNC_FULLRESYNC;}if (!strncmp(reply,"+CONTINUE",9)){...replicationResurrectCachedMaster(fd);if (server.repl_backlog == NULL) createReplicationBacklog();return PSYNC_CONTINUE;}...}+FULLRESYNC <replid> <offset>
,形如这个格式的返回,表示只能进行全量同步,同时Master实例会通告自己的replid
以及repl_offset
信息。+CONTINUE <replid>
,形如这个格式的返回,表示可以进行增量数据同步,Master实例会通过自己的replid
给Slave。
而在Master一段,则是相当于一次处理PSYNC命令的过程。PSYNC命令用于Slave实例在通过REPLCONF命令向Master实例设置了复制相关数据之后向Master请求数据同步,这个命令也是一条内部命令,由Slave实例一侧自动执行,该命令的格式为:
PSYNC <replid> <reploffset>
这个命令之中:
<replid>
指定了Matser实例对应的复制ID,如果Slave发送的<replid>
为?
,则表示是第一次建立连接,需要进行一次全量的数据同步。<reploffset>
指定了Slave自身的复制偏移,同于通知Master是否需要进行一次全量的数据同步
PSYNC命令是通过下面这个syncCommand
函数来实现的:
void syncCommand(client *c);
我们可以来看一下这个函数的一些代码片段,借此窥探PSYNC在处理Master与Slave建立连接的一些细节:
void syncCommand(client *c){if (c->flags & CLIENT_SLAVE) return;if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED){...return;}if (clientHasPendingReplies(c)){...return;}//检查是否可以进行增量的数据同步c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;if (server.repl_disable_tcp_nodelay)anetDisableTcpNoDelay(NULL, c->fd);c->repldbfd = -1;c->flags |= CLIENT_SLAVE;listAddNodeTail(server.slaves, c);if (listLength(server.slaves) == 1 && server.repl_backlog == NULL){chanegReplicationId();createReplicationBacklog();}//处理全量同步的细节数据}
通过上面的代码片段,我们可以发现:
在Master开始处理PSYNC命令之前,Slave实例对应的客户端
client
与其他的客户端没有任何区别,并不具有CLIENT_SLAVE
这个特殊的标记。一个Slave实例也可以接收PSYNC命令,用于处理Sub-Slave的连接,但是只能在处于
REPL_STATE_CONNECTED
状态下才可以处理PSYNC命令。如果Slave实例对应的客户端
client
之中,还有数据在输出缓存里没有被发送到网络上的话,暂时不能执行PSYNC命令。只有在PSYNC命令执行之后,才会给Slave对应的
client
对象设置上CLIENT_SLAVE
这个特殊的标记掩码,并且将这个客户端对象加入到redisServer.slaves
这个服务器双端链表之中。当第一个Slave实例加入到
redisServer.slaves
之中,并且没有分配积压缓冲区的话,会调用createReplicationBacklog
接口来创建积压缓冲区。
执行数据同步
在前面介绍的PSYNC命令,仅介绍了一部分关于建立Master与Slave建立连接的过程,而数据同步的相关流程的一部分逻辑也是在PSYNC命令的对应函数接口之中实现的。接下来我们就来看一下全量数据同步与增量数据同步的相关内容。
全量数据同步
我们继续来展示一段syncCommand
接口的代码片段:
void syncCommand(client *c){...//如果无法进行增量数据同步,那么后续会开始处理全量同步的逻辑c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;...if (server.rdb_child_pid != -1 && server.rdb_child_type == RDB_CHILD_TYPE_DISK){...listRewind(server.slaves, &li);while((ln = listNext(&li))){slave = ln->value;if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break;}if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)){copyClientOutputBuffer(c, slave);replicationSetupForFullResync(c, slave->psync_inirial_offset);}}else if (server.rdb_child_pid != -1 && server.rdb_child_type == RDB_CHILD_TYPE_SOCKET){}else{if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)){}else{if (server.aof_child_pid == -1){startBgsaveForReplication(c->slave_capa);}}}}
这里在我们可以总结出开启数据同步的逻辑与流程:
初始将Slave实例设置为
SLAVE_STATE_WAIT_BGSAVE_START
状态,表示当前Slave实例正在等待Master实例开始生成RDB文件。如果Master当前有后台子进程在向磁盘上生成RDB文件,那么当前Master上所有的Slave,如果有Slave处于
SLAVE_STATE_WAIT_BGSAVE_END
的状态,这意味着这次RDB文件是另外一个Slave由于全量数据同步而请求的。对于这种情况,如果两个Slave的client.slave_capa
相同,想么后面请求数据同步的Slave将会复用前面Slave所请求的这次RDB文件。如果Master当前有后台子进程在通过Socket来生成RDB文件,那么说明有其他的Slave在进行无盘的数据同步,这种情况下不会在开启新的数据同步。
对于没有后台进程生成RDB文件的Master实例,检查服务器是否配置了
server.repl_diskless_sync
,如果服务器配置了无盘数据同步,同时Slave支持SLAVE_CAPA_EOF
,那么Master会通过replicationCron
接口在下一次心跳之中处理无盘的数据同步。如果服务器没有开启无盘数据同步,或者Slave不支持
SLAVE_CAPA_EOF
,其在没有后台子进程重写AOF的情况下,通过调用startBgSaveForReplication
接口来开启同步。
Master开启数据同步生成RDB文件的逻辑是通过startBgsaveForReplication
这个函数接口来实现的:
int startBgsaveForReplication(int mincapa);
这个函数会根据Master实例上redisServer.repl_diskless_sync
的配置以及Slave实例通过REPLCONF通报的对应客户端client.slave_capa
的配置,通过创建子进程的方式进行有盘的RDB文件生成,或是通过网络采用无盘的RDB文件生成逻辑。
在子进程的处理数据的流程开始之后,Master会通过下面这个函数通知Slave实例请求同步的结果:
int replicationSetupSlaveForFullResync(client *slave, long long offset);
调用这个函数,会将Slave对应的客户端client.replstate
字段设置为SLAVE_STATE_WAIT_BGSAVE_END
状态,表示其正在等待RDB文件生成结束,同时设置这个Slave对应客户端对象client.psync_initial_offset
初始复制偏移字段为系统当前的复制偏移量。最后向Slave发送一条返回信息,消息的格式也就是前面介绍的:
+FULLRESYNC <replid> <offset>
这条返回信息告知Slave实例一侧,需要进行一次全量的数据同步,并且将当前Master的复制ID以及复制偏移量一并发送给Slave实例。
有盘的RDB文件数据同步
对于有盘RDB文件的数据同步,Redis会启动一个后台子进程生成RDB文件并将这个文件存储到磁盘上。当文件生成结束后,在通过网络连接将这个文件之中的数据发送到对应的一个或者多个Slave实例上。startBgsaveReplication
函数会通过前面我们在介绍RDB数据持久化时所介绍的rdbSaveBackground
函数接口来异步生成RDB文件,在子进程结束的回调函数backgroundSaveDoneHandlerDisk
中会调用updateSlavesWaitingBgsave
来处理等待RDB生成的那些Slave对象。
void updateSlavesWaitingBgsave(int bgsaveerr, int type);
对于等待RDB文件的Slave,会将其对应的客户端状态client.replstate
从SLAVE_STATE_WIAT_BGSAVE_END
的状态切换至SLAVE_STATE_SEND_BULK
的状态,这个新状态表示这个的Slave客户端进入了传输同步数据的状态。同时在Master实例上为这个Slave对应的client
对象在事件循环之中注册可写事件的处理函数sendBulkToSlave
,这也就意味着基于有盘RDB文件的数据同步,数据的传输是在Redis的主线程之中进行的。
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask);
sendBulkToSlave
这个接口会在发送正式的同步数据之前,向Slave实例以下面的格式发送RDB文件的长度数据:
$<length>\r\n
通过这个消息,Slave实例可以在接收同步数据之前,获得整个待接收数据的大小。接下来sendBulkToSlave
接口会调用调用read
系统调用从RDB文件之中读取PROTO_IOBUF_LEN
大小的数据,并调用write
系统调用接口,将数据发送给Slave实例。当所有的数据都发送结束之后,Redis会将这个客户端的可写事件处理函数从事件循环之中移除,以结束数据的传输,最后调用putSlaveOnline
接口完成数据同步的最后操作。
void putSlaveOnline(client *slave);
putSlaveOnline
这个接口主要会执行三个逻辑的处理:
Slave状态的刷新,将Slave对应的客户端状态
client.replstate
从SLAVE_STATE_SEND_BULK
切换至在线状态SLAVE_STATE_ONLINE
。重写注册可写事件处理函数,重新为Slave对应的客户端注册可写事件回调处理函数
sendReplyToClient
,用以将执行数据全量同步期间Master产生的新命令发送给客户端,以完成数据的同步。刷新健康的Slave数量,调用
refreshGoodSlavesCount
函数接口,来执行数量刷新的逻辑。
无盘的RDB文件数据同步
前面我们在讲有盘的数据同步时介绍了,Master在传输正式的同步数据之前,会先将RDB文件的长度发送给Slave;但是对于无盘的数据同步,其本质上是相当于在RDB文件的生成过程之中直接就将数据发送给Slave,这意味着Slave无法提前获知文件的大小,这样在Slave一侧也就无法得知数据同步的传输是否已经结束。因此需要一种机制,通知Slave传输何时结束。为了解决这个问题,Redis设计了这样一种方式,在发送正式的RDB文件之前,Master会向Slave发送一段如下面格式的数据:
$EOF:<40位的随机十六进制字符串>\r\n
这条数据用于通知Slave实例,数据传输结束的标记,一旦Slave再次收到这段随机字符串,那么就以为意味着数据传输的结束。不过这种情况,需要Slave一侧的Redis版本能够支持以SLAVE_CAPA_EOF
的方式来接收数据,对于不支持SLAVE_CAPA_EOF
的Slave,只能使用有盘的数据同步。
接下来,我们就详细介绍一下在Master实例这边无盘数据同步的实现细节。
在Master一端,与有盘数据同步类似,也是通过startBgsaveReplication
接口来启动的,不过这里Redis为无盘数据同步定义了一个新的函数接口rdbSaveToSlaveSockets
,从名字可以看出,这是一个通过Socket网络连接来生成存储RDB文件的过程。
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi){if (server.aof_child_pid != -1 || server.rdb_child_pid != -1)return C_ERR;listRewind(server.slaves, &li);while ((ln = listNext(&li))){client *slave = ln->value;if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START){fds[numfds++] = slave->fd;replicationSetupSlaveForFullResync(slave, getPsyncInitialOffset);anetBlock(NULL, slave->fd);anetSendTimeout(NULL, slave->fd, server.repl_timeout*1000);}}if ((childpid = fork()) == 0){rio slave_sockets;rioInitWithFdset(&slave_sockets, fds, numfds);rdbSaveRioWithEOFMark(&slave_sockets, NULL, rsi);rioFlush(&slave_sockets);rioFreeFdset(&slave_sockets);exitFromChild(0);}else{}}
这里从上面rdbSaveToSlavesSockets
的代码片段,我们可以总结出这个无盘数据同步的一些细节:
首先无盘数据同步也是通过启动后台子进程来异步进行的。
在开启数据传输之前,遍历
redisServer.slaves
中所有的Slave,收集处于SLAVE_STATE_WAIT_BGSAVE_START
状态的Slave,收集其对应网络连接的套接字文件描述符,并调用replicationSetupSlaveForFullResync
,将其状态切换至SLAVE_STATE_WAIT_BGSAVE_END
。创建子进程,在子进程之中通过上面收集到的套接字文件描述符调用
rioInitWithFdset
来初始化一个通用IO对象rio
,最后子进程调用
rdbSaveRioWithEOFMark
这个接口,完成RDB文件数据的生成与传输。
最后在子进程结束后,Master实例依然会通过调用updateSlavesWaitingBgsave
,不过这里和有盘数据同步的一点差异在于,有盘数据同步会将Slave对应的client
对象从SLAVE_STATE_WAIT_BGSAVE_END
状态切换至SLAVE_STATE_SEND_BULK
的状态,进入数据发送的阶段;然而对于无盘数据同步在生成RDB的过程之中,就已经将数据同步给了Slave,这样Slave实例在SLAVE_STATE_WAIT_BGSAVE_END
状态结束之后,会跳过SLAVE_STATE_SEND_BULK
状态,直接进入SLAVE_STATE_ONLINE
的状态。
Slave对于同步数据的接收
如果Slave在slaveTryPartialResynchronization
函数中获取到了Master发来的形如:
+FULLRESYNC <replid> <reploffset>\r\n
这样的返回数据,则说明此时需要进行全量的数据同步,这时缓存的Master对应的client
对象redisServer.cached_master
将会被replicationDiscardCachedMaster
函数释放。
依然是在syncWithMaster
这个处理函数之中:
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask){...psync_result = slaveTryPartialResynchronization(fd,1);if (psync_result == PSYNC_CONTINUE){return;}while(maxtries--){dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);if (dfd != -1)break;sleep(1);}aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL);...return;}
这里我们看到,如果Master实例向Slave返回需要全量数据同步,那么Slave会创建一个临时的RDB文件,并注册可读事件的处理函数readSyncBulkPayload
来接收来自Master的同步数据。
void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask);
readSyncBulkPayload
这个函数首先会从Master传输的数据之中解析出前序的附加信息:
对于使用无盘数据同步的形式,则需要从
$EOF:<40位随机十六进制结束分隔符>\r\n
中解析出标记传输结束的标记字符串。对于使用有盘数据同步的形式,则需要从
$<length>\r\n
中解析出标记RDB总长度的数据信息。
接下来便是从连接之中持续读取数据,并将数据写入磁盘中的临时RDB文件之中,并判断传输是否结束。
当RDB文件传输结束时,Slave一侧会先清空键空间之中的全部数据,同时调用rdbLoad
函数从RDB文件之中完成数据的加载,最后在调用replicationCreateMasterClient
函数,从连接的套接字文件描述符之中,创建一个表示Master的客户端对象,最终完成整个数据的同步。
增量数据同步
增量数据的发送
Master实例会在PSYNC的命令处理函数syncCommand
之中来判断Slave发来的PSYNC命令,是否可以进行一次增量的数据同步。
void syncCommand(client *c){if (c->flags & CLIENT_SLAVE) return;...if (!strcasecmp(c->argv[0]->ptr, "psync")){if (masterTryPartialResynchronization(c) == C_OK) {server.stat_sync_partial_ok++;return;}else{...}}}
如果可以,便进行增量数据同步,否则便进行全量数据同步。
int masterTryPartialResynchronization(client *c);
上面这个函数,是Master判断是否可以进行增量数据同步的接口,如果可以进行增量同步,那么函数会返回C_OK
;否则的话,函数返回C_ERR
,后续则会进行全量数据的同步。masterTryPartialResyncChronization
这函数的实现逻辑较为简单:
通过从客户端命令之中解析出PSYNC的参数
<replid>
以及<reploffset>
,来判断这个Slave是否可以进行增量数据的同步:Slave发送的
<replid>
要与Master的replid
相同。Slave通告的自己的复制偏移量
<reploffset>
要恰好落在Master的积压缓冲区内。由于省去了全量数据同步的过程,因此直接将Slave的状态设置为在线的状态
SLAVE_STATE_ONLINE
。向Slave发送一条返回消息,格式为
+CONTINUS\r\n
,用于通知客户端将要进行增量数据同步。通过
addReplyReplicationBacklog
将积压缓冲区之中的数据发送给Slave。最后通过
refreshGoodsSlaveCount
函数,来刷新Slave的计次。
Slave增量数据的接收
Slave实例会在slaveTryPartialResynchronization
函数中等待从Master的返回数据。从前面介绍Master一侧的数据同步我们可以知道,如果Slave的PSYNC请求恰好可以完成一次增量的同步,Master会向Slave发送一个+CONTINUE\r\n
的返回数据,通知可以进行增量数据同步;然后会加积压缓冲区之中的数据发送给Slave。而在Slave这一端,当检测返回数据为+CONTINUE
时,也就意味着与Master重连成功,此时可以复用之前缓存的Master客户端对象redisServer.cached_master
。
void replicationResurrectCachedMaster(int newfd);
在Master积压缓冲区之中的增量同步数据,可以被认为是一系列的命令数据的集合,因此Slave在通过replicationResurrectCachedMaster
重用redisServer.cached_master
时,会为这个客户端注册可读事件的回调函数readQueryFromClient
,后续Slave将以正常处理客户端命令的形式来处理Master积压缓冲区之中的增量同步数据。




