暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

redis 集群源码解析(10)数据读写

谷竹 2020-06-27
302

前面几篇文章,我们介绍了集群管理相关操作是如何实现的。这篇文章,我们介绍数据读写的源码实现。




示例

    redis-cli -c -p 7000
    127.0.0.1:7000> set hello world
    127.0.0.1:7000> get hello


    我们已经知道,数据分布在集群的各个节点上,那么,集群是如何决定数据保存在哪个节点上?又是如何知道去哪个节点上读取数据呢?


      /* If cluster is enabled perform the cluster redirection here.
       * However we don't perform the redirection if:
       * 1) The sender of this command is our master.
       * 2) The command has no key arguments. */
      if (server.cluster_enabled &&
          !(c->flags & CLIENT_MASTER) &&
          !(c->flags & CLIENT_LUA &&
            server.lua_caller->flags & CLIENT_MASTER) &&
          !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
            c->cmd->proc != execCommand))
      {
          int hashslot;
          int error_code;
          clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
                                          &hashslot,&error_code);
          if (n == NULL || n != server.cluster->myself) {
              if (c->cmd->proc == execCommand) {
                  discardTransaction(c);
              } else {
                  flagTransaction(c);
              }
              clusterRedirectClient(c,n,hashslot,error_code);
              return C_OK;
          }
      }

      上面的代码引用自 processCommand() 函数。processCommand() 函数是命令处理函数,也就是执行命令的入口函数。

      从上面的代码可以看出,要执行一条命令,先根据命令找到执行命令的节点,如果节点不是当前节点,则重定向到执行命令的节点。


      我们先来看 getNodeByQuery() 函数,该函数决定命令在哪个节点上执行。

        clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
        clusterNode *n = NULL;
        robj *firstkey = NULL;
        int multiple_keys = 0;
        multiState *ms, _ms;
        multiCmd mc;
        int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0;


        * Allow any key to be set if a module disabled cluster redirections. */
        if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
        return myself;


        * Set error code optimistically for the base case. */
        if (error_code) *error_code = CLUSTER_REDIR_NONE;


        * Modules can turn off Redis Cluster redirection: this is useful
        * when writing a module that implements a completely different
        * distributed system. */


        * We handle all the cases as if they were EXEC commands, so we have
        * a common code path for everything */
        if (cmd->proc == execCommand) {
        * If CLIENT_MULTI flag is not set EXEC is just going to return an
        * error. */
        if (!(c->flags & CLIENT_MULTI)) return myself;
        ms = &c->mstate;
        } else {
        * In order to have a single codepath create a fake Multi State
        * structure if the client is not in MULTI/EXEC state, this way
        * we have a single codepath below. */
        ms = &_ms;
        _ms.commands = &mc;
        _ms.count = 1;
        mc.argv = argv;
        mc.argc = argc;
        mc.cmd = cmd;
        }


        * Check that all the keys are in the same hash slot, and obtain this
        * slot and the node associated. */
        for (i = 0; i < ms->count; i++) {
        struct redisCommand *mcmd;
        robj **margv;
        int margc, *keyindex, numkeys, j;


        mcmd = ms->commands[i].cmd;
        margc = ms->commands[i].argc;
        margv = ms->commands[i].argv;


        keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);
        for (j = 0; j < numkeys; j++) {
        robj *thiskey = margv[keyindex[j]];
        int thisslot = keyHashSlot((char*)thiskey->ptr,
        sdslen(thiskey->ptr));


        if (firstkey == NULL) {
        * This is the first key we see. Check what is the slot
        * and node. */
        firstkey = thiskey;
        slot = thisslot;
        n = server.cluster->slots[slot];


        * Error: If a slot is not served, we are in "cluster down"
        * state. However the state is yet to be updated, so this was
        * not trapped earlier in processCommand(). Report the same
        * error to the client. */
        if (n == NULL) {
        getKeysFreeResult(keyindex);
        if (error_code)
        *error_code = CLUSTER_REDIR_DOWN_UNBOUND;
        return NULL;
        }


        * If we are migrating or importing this slot, we need to check
        * if we have all the keys in the request (the only way we
        * can safely serve the request, otherwise we return a TRYAGAIN
        * error). To do so we set the importing/migrating state and
        * increment a counter for every missing key. */
        if (n == myself &&
        server.cluster->migrating_slots_to[slot] != NULL)
        {
        migrating_slot = 1;
        } else if (server.cluster->importing_slots_from[slot] != NULL) {
        importing_slot = 1;
        }
        } else {
        * If it is not the first key, make sure it is exactly
        * the same key as the first we saw. */
                        if (!equalStringObjects(firstkey,thiskey)) {
        if (slot != thisslot) {
        * Error: multiple keys from different slots. */
        getKeysFreeResult(keyindex);
        if (error_code)
        *error_code = CLUSTER_REDIR_CROSS_SLOT;
        return NULL;
        } else {
        * Flag this request as one with multiple different
        * keys. */
        multiple_keys = 1;
        }
        }
        }


        * Migarting Improrting slot? Count keys we don't have. */
        if ((migrating_slot || importing_slot) &&
        lookupKeyRead(&server.db[0],thiskey) == NULL)
        {
        missing_keys++;
        }
        }
        getKeysFreeResult(keyindex);
        }


        * No key at all in command? then we can serve the request
        * without redirections or errors in all the cases. */
        if (n == NULL) return myself;


        * Cluster is globally down but we got keys? We can't serve the request. */
        if (server.cluster->state != CLUSTER_OK) {
        if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
        return NULL;
        }


        /* Return the hashslot by reference. */
        if (hashslot) *hashslot = slot;


        /* MIGRATE always works in the context of the local node if the slot
        * is open (migrating or importing state). We need to be able to freely
        * move keys among instances in this case. */
        if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand)
        return myself;


        /* If we don't have all the keys and we are migrating the slot, send
        * an ASK redirection. */
        if (migrating_slot && missing_keys) {
        if (error_code) *error_code = CLUSTER_REDIR_ASK;
        return server.cluster->migrating_slots_to[slot];
        }


        /* If we are receiving the slot, and the client correctly flagged the
        * request as "ASKING", we can serve the request. However if the request
        * involves multiple keys and we don't have them all, the only option is
        * to send a TRYAGAIN error. */
        if (importing_slot &&
        (c->flags & CLIENT_ASKING || cmd->flags & CMD_ASKING))
        {
        if (multiple_keys && missing_keys) {
        if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
        return NULL;
        } else {
        return myself;
        }
        }


        /* Handle the read-only client case reading from a slave: if this
        * node is a slave and the request is about an hash slot our master
        * is serving, we can reply without redirection. */
        if (c->flags & CLIENT_READONLY &&
        (cmd->flags & CMD_READONLY || cmd->proc == evalCommand ||
        cmd->proc == evalShaCommand) &&
        nodeIsSlave(myself) &&
        myself->slaveof == n)
        {
        return myself;
        }


        /* Base case: just return the right node. However if this node is not
        * myself, set error_code to MOVED since we need to issue a rediretion. */
        if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED;
        return n;
        }

        该函数较长,但我们的重点是:该函数是如何决定命令在哪个节点上执行的?

          robj *thiskey = margv[keyindex[j]];
          int thisslot = keyHashSlot((char*)thiskey->ptr,
                                     sdslen(thiskey->ptr));

          key 经过哈希计算得到所在的槽。

            slot = thisslot;
            n = server.cluster->slots[slot];

            查找管理该槽的节点。

              /* Base case: just return the right node. However if this node is not
               * myself, set error_code to MOVED since we need to issue a rediretion. */
              if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED;
              return n;

              返回该节点,如果该节点不是当前节点,则设置错误码为:

              CLUSTER_REDIR_MOVED。

                /* If it is not the first key, make sure it is exactly
                 * the same key as the first we saw. */
                if (!equalStringObjects(firstkey,thiskey)) {
                    if (slot != thisslot) {
                        /* Error: multiple keys from different slots. */
                        getKeysFreeResult(keyindex);
                        if (error_code)
                            *error_code = CLUSTER_REDIR_CROSS_SLOT;
                        return NULL;
                    } else {
                        /* Flag this request as one with multiple different
                         * keys. */
                        multiple_keys = 1;
                    }
                }

                注意:如果命令中包含多个 key,则这些 key 必须在同一个槽中,否则,命令不能执行。

                  /* No key at all in command? then we can serve the request
                   * without redirections or errors in all the cases. */
                  if (n == NULL) return myself;

                  当然,如果命令中没有 key,则命令在当前节点即可执行。


                  接下来,再看 clusterRedirectClient() 函数,该函数返回重定向信息。

                    void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) {
                    if (error_code == CLUSTER_REDIR_CROSS_SLOT) {
                    addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
                    } else if (error_code == CLUSTER_REDIR_UNSTABLE) {
                    /* The request spawns multiple keys in the same slot,
                    * but the slot is not "stable" currently as there is
                    * a migration or import in progress. */
                    addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
                    } else if (error_code == CLUSTER_REDIR_DOWN_STATE) {
                    addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down\r\n"));
                    } else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) {
                    addReplySds(c,sdsnew("-CLUSTERDOWN Hash slot not served\r\n"));
                    } else if (error_code == CLUSTER_REDIR_MOVED ||
                    error_code == CLUSTER_REDIR_ASK)
                    {
                    addReplySds(c,sdscatprintf(sdsempty(),
                    "-%s %d %s:%d\r\n",
                    (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
                    hashslot,n->ip,n->port));
                    } else {
                    serverPanic("getNodeByQuery() unknown error.");
                    }
                    }

                    重定向信息包含:"MOVED"、slot、ip、port。


                    返回重定向信息给客户端之后,客户端又是如何处理的呢?


                      /* Check if we need to connect to a different node and reissue the
                       * request. */
                      if (config.cluster_mode && reply->type == REDIS_REPLY_ERROR &&
                          (!strncmp(reply->str,"MOVED",5) || !strcmp(reply->str,"ASK")))
                      {
                          char *p = reply->str, *s;
                          int slot;


                          output = 0;
                          /* Comments show the position of the pointer as:
                           *
                           * [S] for pointer 's'
                           * [P] for pointer 'p'
                           */
                          s = strchr(p,' ');      /* MOVED[S]3999 127.0.0.1:6381 */
                          p = strchr(s+1,' ');    /* MOVED[S]3999[P]127.0.0.1:6381 */
                          *p = '\0';
                          slot = atoi(s+1);
                          s = strrchr(p+1,':');    /* MOVED 3999[P]127.0.0.1[S]6381 */
                          *s = '\0';
                          sdsfree(config.hostip);
                          config.hostip = sdsnew(p+1);
                          config.hostport = atoi(s+1);
                          if (config.interactive)
                              printf("-> Redirected to slot [%d] located at %s:%d\n",
                                  slot, config.hostip, config.hostport);
                          config.cluster_reissue_command = 1;
                          cliRefreshPrompt();
                      }

                      上面的代码引用自 cliReadReply() 函数。

                      从上面的代码可以看出,当响应中包含 "MOVED" 字符串时,设置客户端为重新发送命令状态:config.cluster_reissue_command = 1


                        static int issueCommandRepeat(int argc, char **argv, long repeat) {
                        while (1) {
                        config.cluster_reissue_command = 0;
                        if (cliSendCommand(argc,argv,repeat) != REDIS_OK) {
                        cliConnect(CC_FORCE);


                        /* If we still cannot send the command print error.
                        * We'll try to reconnect the next time. */
                        if (cliSendCommand(argc,argv,repeat) != REDIS_OK) {
                        cliPrintContextError();
                        return REDIS_ERR;
                        }
                        }
                        /* Issue the command again if we got redirected in cluster mode */
                        if (config.cluster_mode && config.cluster_reissue_command) {
                        cliConnect(CC_FORCE);
                        } else {
                        break;
                        }
                        }
                        return REDIS_OK;
                        }
                          /* Issue the command again if we got redirected in cluster mode */
                          if (config.cluster_mode && config.cluster_reissue_command) {
                             cliConnect(CC_FORCE);
                          }

                          当 config.cluster_reissue_command = 1 时,客户端重新连接到正确的节点,然后发送命令。


                          总结

                          当客户端连接到服务端读写数据时,服务端根据 key 来判断数据是否在当前节点,如果不在,则返回重定向信息,客户端根据该信息连接到正确的节点,进行数据读写。



                          文章转载自谷竹,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                          评论