暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Redis源码学习(72)-Redis主从复制(4)

马基雅维利incoding 2021-08-09
521

数据同步

Slave实例向Master实例申请数据同步

Slave实例在syncWithMaster
函数中完成建立与Master实例的连接之后,便会将redisServer.repl_state
这个状态字段切换至REPL_STATE_SEND_PSYNC
这个状态,表示Slave实例将要向Master发起数据同步请求。

    void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
    ...
    if (server.repl_state == REPL_STATE_SEND_PSYNC)
    {
    if (slaveTryPartialResynchronization(fd, 0) == PSYNC_WRITE_ERROR)
    {
    goto wirte_error;
    }
    server.repl_state = REPL_STATE_RECEIVE_PSYNC;
    return;
    }

    if (server.repl_state != REPL_STATE_RECEIVE_PSYNC)
    {
    goto error;
    }
    psync_result = slaveTryPartialResynchronization(fd,1);
    ...
    }

    在上面这段代码片段之中,syncWithMaster
    会分两次调用slaveTryPartialResynchronization
    接口向Master尝试请求数据同步:

    1. 第一次调用会向Master发起PSYNC命令,用以通报自身的复制信息。如果这个Slave是在连接断开之后重新连接的情况,则会将redisServer.cached_master
      之中缓存的同步信息包括replid
      以及offset
      通报给Master,由Master判断是进行增量数据同步还是全量的数据同步。


        int slaveTryPartialResynchronization(int fd, int read_reply)
        {
        //发送命令
        if (!read_reply)
        {

        return PSYNC_WAIT_REPLY;
        }
        ...
        }
      • 第二次调用则是接收从Master返回的PSYNC命令的结果:


          int slaveTryPartialResynchronization(int fd, int read_reply)
          {
          ...
          // 读取命令返回
          reply = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
          aeDeleteFileEvent(server.el,fd,AE_READABLE);
          if (!strncmp(reply,"+FULLRESYNC",11))
          {
          //解析replid以及offset
          ...
          replicationDiscardCachedMaster();
          return PSYNC_FULLRESYNC;
          }

          if (!strncmp(reply,"+CONTINUE",9))
          {
          ...
          replicationResurrectCachedMaster(fd);
          if (server.repl_backlog == NULL) createReplicationBacklog();
          return PSYNC_CONTINUE;
          }
          ...
          }
          1. +FULLRESYNC <replid> <offset>
            ,形如这个格式的返回,表示只能进行全量同步,同时Master实例会通告自己的replid
            以及repl_offset
            信息。

          2. +CONTINUE <replid>
            ,形如这个格式的返回,表示可以进行增量数据同步,Master实例会通过自己的replid
            Slave

        而在Master一段,则是相当于一次处理PSYNC命令的过程。PSYNC命令用于Slave实例在通过REPLCONF命令向Master实例设置了复制相关数据之后向Master请求数据同步,这个命令也是一条内部命令,由Slave实例一侧自动执行,该命令的格式为:

        PSYNC <replid> <reploffset>

        这个命令之中:

        1. <replid>
          指定了Matser实例对应的复制ID,如果Slave发送的<replid>
          ?
          ,则表示是第一次建立连接,需要进行一次全量的数据同步

        2. <reploffset>
          指定了Slave自身的复制偏移,同于通知Master是否需要进行一次全量的数据同步

        PSYNC命令是通过下面这个syncCommand
        函数来实现的:

          void syncCommand(client *c);

          我们可以来看一下这个函数的一些代码片段,借此窥探PSYNC在处理MasterSlave建立连接的一些细节:

            void syncCommand(client *c)
            {
            if (c->flags & CLIENT_SLAVE) return;
            if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED)
            {
            ...
            return;
            }

            if (clientHasPendingReplies(c))
            {
            ...
            return;
            }

            //检查是否可以进行增量的数据同步

            c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
            if (server.repl_disable_tcp_nodelay)
            anetDisableTcpNoDelay(NULL, c->fd);
            c->repldbfd = -1;
            c->flags |= CLIENT_SLAVE;
            listAddNodeTail(server.slaves, c);
            if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
            {
            chanegReplicationId();
            createReplicationBacklog();
            }

            //处理全量同步的细节数据
            }

            通过上面的代码片段,我们可以发现:

            1. Master开始处理PSYNC命令之前,Slave实例对应的客户端client
              与其他的客户端没有任何区别,并不具有CLIENT_SLAVE
              这个特殊的标记。

            2. 一个Slave实例也可以接收PSYNC命令,用于处理Sub-Slave的连接,但是只能在处于REPL_STATE_CONNECTED
              状态下才可以处理PSYNC命令。

            3. 如果Slave实例对应的客户端client
              之中,还有数据在输出缓存里没有被发送到网络上的话,暂时不能执行PSYNC命令。

            4. 只有在PSYNC命令执行之后,才会给Slave对应的client
              对象设置上CLIENT_SLAVE
              这个特殊的标记掩码,并且将这个客户端对象加入到redisServer.slaves
              这个服务器双端链表之中。

            5. 当第一个Slave实例加入到redisServer.slaves
              之中,并且没有分配积压缓冲区的话,会调用createReplicationBacklog
              接口来创建积压缓冲区。

            执行数据同步

            在前面介绍的PSYNC命令,仅介绍了一部分关于建立MasterSlave建立连接的过程,而数据同步的相关流程的一部分逻辑也是在PSYNC命令的对应函数接口之中实现的。接下来我们就来看一下全量数据同步增量数据同步的相关内容。

            全量数据同步

            我们继续来展示一段syncCommand
            接口的代码片段:

              void syncCommand(client *c)
              {
              ...
              //如果无法进行增量数据同步,那么后续会开始处理全量同步的逻辑
              c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
              ...
              if (server.rdb_child_pid != -1 && server.rdb_child_type == RDB_CHILD_TYPE_DISK)
              {
              ...
              listRewind(server.slaves, &li);
              while((ln = listNext(&li)))
              {
              slave = ln->value;
              if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break;
              }


              if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa))
              {
              copyClientOutputBuffer(c, slave);
              replicationSetupForFullResync(c, slave->psync_inirial_offset);
              }
              }
              else if (server.rdb_child_pid != -1 && server.rdb_child_type == RDB_CHILD_TYPE_SOCKET)
              {


              }
              else
              {
              if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF))
              {

              }
              else
              {
              if (server.aof_child_pid == -1)
              {
              startBgsaveForReplication(c->slave_capa);
              }
              }
              }
              }

              这里在我们可以总结出开启数据同步的逻辑与流程:

              1. 初始将Slave实例设置为SLAVE_STATE_WAIT_BGSAVE_START
                状态,表示当前Slave实例正在等待Master实例开始生成RDB文件。

              2. 如果Master当前有后台子进程在向磁盘上生成RDB文件,那么当前Master上所有的Slave,如果有Slave处于SLAVE_STATE_WAIT_BGSAVE_END
                的状态,这意味着这次RDB文件是另外一个Slave由于全量数据同步而请求的。对于这种情况,如果两个Slaveclient.slave_capa
                相同,想么后面请求数据同步Slave将会复用前面Slave所请求的这次RDB文件。

              3. 如果Master当前有后台子进程在通过Socket来生成RDB文件,那么说明有其他的Slave在进行无盘的数据同步,这种情况下不会在开启新的数据同步

              4. 对于没有后台进程生成RDB文件的Master实例,检查服务器是否配置了server.repl_diskless_sync
                ,如果服务器配置了无盘数据同步,同时Slave支持SLAVE_CAPA_EOF
                ,那么Master会通过replicationCron
                接口在下一次心跳之中处理无盘的数据同步

              5. 如果服务器没有开启无盘数据同步,或者Slave不支持SLAVE_CAPA_EOF
                ,其在没有后台子进程重写AOF的情况下,通过调用startBgSaveForReplication
                接口来开启同步。

              Master开启数据同步生成RDB文件的逻辑是通过startBgsaveForReplication
              这个函数接口来实现的:

                int startBgsaveForReplication(int mincapa);

                这个函数会根据Master实例上redisServer.repl_diskless_sync
                的配置以及Slave实例通过REPLCONF通报的对应客户端client.slave_capa
                的配置,通过创建子进程的方式进行有盘的RDB文件生成,或是通过网络采用无盘的RDB文件生成逻辑。

                在子进程的处理数据的流程开始之后,Master会通过下面这个函数通知Slave实例请求同步的结果:

                  int replicationSetupSlaveForFullResync(client *slave, long long offset);

                  调用这个函数,会将Slave对应的客户端client.replstate
                  字段设置为SLAVE_STATE_WAIT_BGSAVE_END
                  状态,表示其正在等待RDB文件生成结束,同时设置这个Slave对应客户端对象client.psync_initial_offset
                  初始复制偏移字段为系统当前的复制偏移量。最后向Slave发送一条返回信息,消息的格式也就是前面介绍的:

                  +FULLRESYNC <replid> <offset>

                  这条返回信息告知Slave实例一侧,需要进行一次全量的数据同步,并且将当前Master的复制ID以及复制偏移量一并发送给Slave实例。

                  有盘的RDB文件数据同步

                  对于有盘RDB文件的数据同步Redis会启动一个后台子进程生成RDB文件并将这个文件存储到磁盘上。当文件生成结束后,在通过网络连接将这个文件之中的数据发送到对应的一个或者多个Slave实例上。startBgsaveReplication
                  函数会通过前面我们在介绍RDB数据持久化时所介绍的rdbSaveBackground
                  函数接口来异步生成RDB文件,在子进程结束的回调函数backgroundSaveDoneHandlerDisk
                  中会调用updateSlavesWaitingBgsave
                  来处理等待RDB生成的那些Slave对象。

                    void updateSlavesWaitingBgsave(int bgsaveerr, int type);

                    对于等待RDB文件的Slave,会将其对应的客户端状态client.replstate
                    SLAVE_STATE_WIAT_BGSAVE_END
                    的状态切换至SLAVE_STATE_SEND_BULK
                    的状态,这个新状态表示这个的Slave客户端进入了传输同步数据的状态。同时在Master实例上为这个Slave对应的client
                    对象在事件循环之中注册可写事件的处理函数sendBulkToSlave
                    ,这也就意味着基于有盘RDB文件的数据同步,数据的传输是在Redis的主线程之中进行的。

                      void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask);

                      sendBulkToSlave
                      这个接口会在发送正式的同步数据之前,向Slave实例以下面的格式发送RDB文件的长度数据:

                      $<length>\r\n

                      通过这个消息,Slave实例可以在接收同步数据之前,获得整个待接收数据的大小。接下来sendBulkToSlave
                      接口会调用调用read
                      系统调用从RDB文件之中读取PROTO_IOBUF_LEN
                      大小的数据,并调用write
                      系统调用接口,将数据发送给Slave实例。当所有的数据都发送结束之后,Redis会将这个客户端的可写事件处理函数从事件循环之中移除,以结束数据的传输,最后调用putSlaveOnline
                      接口完成数据同步的最后操作。

                        void putSlaveOnline(client *slave);

                        putSlaveOnline
                        这个接口主要会执行三个逻辑的处理:

                        1. Slave状态的刷新,将Slave对应的客户端状态client.replstate
                          SLAVE_STATE_SEND_BULK
                          切换至在线状态SLAVE_STATE_ONLINE

                        2. 重写注册可写事件处理函数,重新为Slave对应的客户端注册可写事件回调处理函数sendReplyToClient
                          ,用以将执行数据全量同步期间Master产生的新命令发送给客户端,以完成数据的同步。

                        3. 刷新健康的Slave数量,调用refreshGoodSlavesCount
                          函数接口,来执行数量刷新的逻辑。

                        无盘的RDB文件数据同步

                        前面我们在讲有盘的数据同步时介绍了,Master在传输正式的同步数据之前,会先将RDB文件的长度发送给Slave;但是对于无盘的数据同步,其本质上是相当于在RDB文件的生成过程之中直接就将数据发送给Slave,这意味着Slave无法提前获知文件的大小,这样在Slave一侧也就无法得知数据同步的传输是否已经结束。因此需要一种机制,通知Slave传输何时结束。为了解决这个问题,Redis设计了这样一种方式,在发送正式的RDB文件之前,Master会向Slave发送一段如下面格式的数据:

                        $EOF:<40位的随机十六进制字符串>\r\n

                        这条数据用于通知Slave实例,数据传输结束的标记,一旦Slave再次收到这段随机字符串,那么就以为意味着数据传输的结束。不过这种情况,需要Slave一侧的Redis版本能够支持以SLAVE_CAPA_EOF
                        的方式来接收数据,对于不支持SLAVE_CAPA_EOF
                        Slave,只能使用有盘的数据同步

                        接下来,我们就详细介绍一下在Master实例这边无盘数据同步的实现细节。

                        Master一端,与有盘数据同步类似,也是通过startBgsaveReplication
                        接口来启动的,不过这里Redis为无盘数据同步定义了一个新的函数接口rdbSaveToSlaveSockets
                        ,从名字可以看出,这是一个通过Socket网络连接来生成存储RDB文件的过程。

                          int rdbSaveToSlavesSockets(rdbSaveInfo *rsi)
                          {
                          if (server.aof_child_pid != -1 || server.rdb_child_pid != -1)
                          return C_ERR;


                          listRewind(server.slaves, &li);
                          while ((ln = listNext(&li)))
                          {
                          client *slave = ln->value;
                          if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START)
                          {
                          fds[numfds++] = slave->fd;
                          replicationSetupSlaveForFullResync(slave, getPsyncInitialOffset);
                          anetBlock(NULL, slave->fd);
                          anetSendTimeout(NULL, slave->fd, server.repl_timeout*1000);
                          }
                          }


                          if ((childpid = fork()) == 0)
                          {
                          rio slave_sockets;
                          rioInitWithFdset(&slave_sockets, fds, numfds);

                          rdbSaveRioWithEOFMark(&slave_sockets, NULL, rsi);

                          rioFlush(&slave_sockets);


                          rioFreeFdset(&slave_sockets);
                          exitFromChild(0);
                          }
                          else
                          {


                          }
                          }

                          这里从上面rdbSaveToSlavesSockets
                          的代码片段,我们可以总结出这个无盘数据同步的一些细节:

                          1. 首先无盘数据同步也是通过启动后台子进程来异步进行的。

                          2. 在开启数据传输之前,遍历redisServer.slaves
                            中所有的Slave,收集处于SLAVE_STATE_WAIT_BGSAVE_START
                            状态的Slave,收集其对应网络连接的套接字文件描述符,并调用replicationSetupSlaveForFullResync
                            ,将其状态切换至SLAVE_STATE_WAIT_BGSAVE_END

                          3. 创建子进程,在子进程之中通过上面收集到的套接字文件描述符调用rioInitWithFdset
                            来初始化一个通用IO对象rio

                          4. 最后子进程调用rdbSaveRioWithEOFMark
                            这个接口,完成RDB文件数据的生成与传输。

                          最后在子进程结束后,Master实例依然会通过调用updateSlavesWaitingBgsave
                          ,不过这里和有盘数据同步的一点差异在于,有盘数据同步会将Slave对应的client
                          对象从SLAVE_STATE_WAIT_BGSAVE_END
                          状态切换至SLAVE_STATE_SEND_BULK
                          的状态,进入数据发送的阶段;然而对于无盘数据同步在生成RDB的过程之中,就已经将数据同步给了Slave,这样Slave实例在SLAVE_STATE_WAIT_BGSAVE_END
                          状态结束之后,会跳过SLAVE_STATE_SEND_BULK
                          状态,直接进入SLAVE_STATE_ONLINE
                          的状态。

                          Slave对于同步数据的接收

                          如果SlaveslaveTryPartialResynchronization
                          函数中获取到了Master发来的形如:

                          +FULLRESYNC <replid> <reploffset>\r\n

                          这样的返回数据,则说明此时需要进行全量的数据同步,这时缓存的Master对应的client
                          对象redisServer.cached_master
                          将会被replicationDiscardCachedMaster
                          函数释放。

                          依然是在syncWithMaster
                          这个处理函数之中:

                            void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask)
                            {
                            ...
                            psync_result = slaveTryPartialResynchronization(fd,1);
                            if (psync_result == PSYNC_CONTINUE)
                            {
                            return;
                            }
                            while(maxtries--)
                            {
                            dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
                            if (dfd != -1)
                            break;
                            sleep(1);
                            }
                            aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL);
                            ...
                            return;
                            }

                            这里我们看到,如果Master实例向Slave返回需要全量数据同步,那么Slave会创建一个临时的RDB文件,并注册可读事件的处理函数readSyncBulkPayload
                            来接收来自Master的同步数据。

                              void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask);

                              readSyncBulkPayload
                              这个函数首先会从Master传输的数据之中解析出前序的附加信息:

                              1. 对于使用无盘数据同步的形式,则需要从$EOF:<40位随机十六进制结束分隔符>\r\n
                                中解析出标记传输结束的标记字符串。

                              2. 对于使用有盘数据同步的形式,则需要从$<length>\r\n
                                中解析出标记RDB总长度的数据信息。

                              接下来便是从连接之中持续读取数据,并将数据写入磁盘中的临时RDB文件之中,并判断传输是否结束。

                              RDB文件传输结束时,Slave一侧会先清空键空间之中的全部数据,同时调用rdbLoad
                              函数从RDB文件之中完成数据的加载,最后在调用replicationCreateMasterClient
                              函数,从连接的套接字文件描述符之中,创建一个表示Master的客户端对象,最终完成整个数据的同步。

                              增量数据同步

                              增量数据的发送

                              Master实例会在PSYNC的命令处理函数syncCommand
                              之中来判断Slave发来的PSYNC命令,是否可以进行一次增量的数据同步

                                void syncCommand(client *c)
                                {
                                if (c->flags & CLIENT_SLAVE) return;
                                ...
                                if (!strcasecmp(c->argv[0]->ptr, "psync"))
                                {
                                if (masterTryPartialResynchronization(c) == C_OK) {
                                server.stat_sync_partial_ok++;
                                return;
                                }
                                else
                                {
                                ...
                                }
                                }
                                }

                                如果可以,便进行增量数据同步,否则便进行全量数据同步

                                int masterTryPartialResynchronization(client *c);

                                上面这个函数,是Master判断是否可以进行增量数据同步的接口,如果可以进行增量同步,那么函数会返回C_OK
                                ;否则的话,函数返回C_ERR
                                ,后续则会进行全量数据的同步。masterTryPartialResyncChronization
                                这函数的实现逻辑较为简单:

                                1. 通过从客户端命令之中解析出PSYNC的参数<replid>
                                  以及<reploffset>
                                  ,来判断这个Slave是否可以进行增量数据的同步:

                                  1. Slave发送的<replid>
                                    要与Masterreplid
                                    相同。

                                  2. Slave通告的自己的复制偏移量<reploffset>
                                    要恰好落在Master的积压缓冲区内。

                                2. 由于省去了全量数据同步的过程,因此直接将Slave的状态设置为在线的状态SLAVE_STATE_ONLINE

                                3. Slave发送一条返回消息,格式为+CONTINUS\r\n
                                  ,用于通知客户端将要进行增量数据同步

                                4. 通过addReplyReplicationBacklog
                                  将积压缓冲区之中的数据发送给Slave

                                5. 最后通过refreshGoodsSlaveCount
                                  函数,来刷新Slave的计次。

                                Slave增量数据的接收

                                Slave实例会在slaveTryPartialResynchronization
                                函数中等待从Master的返回数据。从前面介绍Master一侧的数据同步我们可以知道,如果SlavePSYNC请求恰好可以完成一次增量的同步,Master会向Slave发送一个+CONTINUE\r\n
                                的返回数据,通知可以进行增量数据同步;然后会加积压缓冲区之中的数据发送给Slave。而在Slave这一端,当检测返回数据为+CONTINUE
                                时,也就意味着与Master重连成功,此时可以复用之前缓存的Master客户端对象redisServer.cached_master

                                  void replicationResurrectCachedMaster(int newfd);

                                  Master积压缓冲区之中的增量同步数据,可以被认为是一系列的命令数据的集合,因此Slave在通过replicationResurrectCachedMaster
                                  重用redisServer.cached_master
                                  时,会为这个客户端注册可读事件的回调函数readQueryFromClient
                                  ,后续Slave将以正常处理客户端命令的形式来处理Master积压缓冲区之中的增量同步数据。


                                  文章转载自马基雅维利incoding,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                  评论