暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

redis 集群源码解析(9)重新分片

谷竹 2020-06-26
636

这篇文章,我们将介绍重新分片是如何实现的。重新分片就是把现有主节点所管理的槽进行重新划分。




参数

--cluster-slots:要划分的槽数

--cluster-from:源节点

--cluster-to:目标节点


重新分片就是把源节点所管理的槽划分给目标节点,如果 --cluster-from 参数为 all,表示所有主节点都参与划分,否则,为要参与划分的主节点ID

如果 --cluster-from 参数在命令行中指定,那么多个主节点以“,”分隔


示例

    redis-cli --cluster reshard 127.0.0.1:7000
    redis-cli --cluster reshard 127.0.0.1:7000 --cluster-slots 1000 --cluster-from all --cluster-to 4df14f590f0aed549894fda17b36d90d42f755ee

    示例1,根据提示输入相关参数;

    示例2,直接在命令行中指定相关参数。


    分片计划

    在执行分片计划之前,reshard 操作会显示分片计划信息,然后输入“yes”确认。

      Ready to move 1000 slots.
      Source nodes:
      M: ee0500d25b798768e7db2cc350110e6c7ab612b5 127.0.0.1:7000
      slots:[0-5460] (5461 slots) master
      1 additional replica(s)
      M: 0bd81e2d29bd2a7ea445a3809b44802df498d49e 127.0.0.1:7001
      slots:[5461-10922] (5462 slots) master
      1 additional replica(s)
      M: 19696e34600c1023350c860b1ba3c842a70a65d2 127.0.0.1:7002
      slots:[10923-16383] (5461 slots) master
      1 additional replica(s)
      Destination node:
      M: 4df14f590f0aed549894fda17b36d90d42f755ee 127.0.0.1:7006
      slots: (0 slots) master
      Resharding plan:
      Moving slot 0 from ee0500d25b798768e7db2cc350110e6c7ab612b5
      Moving slot 1 from ee0500d25b798768e7db2cc350110e6c7ab612b5
      Moving slot 2 from ee0500d25b798768e7db2cc350110e6c7ab612b5
      Moving slot 3 from ee0500d25b798768e7db2cc350110e6c7ab612b5
      Moving slot 4 from ee0500d25b798768e7db2cc350110e6c7ab612b5
      Moving slot 5 from ee0500d25b798768e7db2cc350110e6c7ab612b5




      声明:由于函数前半部分主要处理输入参数相关,且代码行数较长,为了节省文章篇幅,故省略。

        static int clusterManagerCommandReshard(int argc, char **argv) {
            
            ******************** 参数相关 省略 ********************
            
        printf("\nReady to move %d slots.\n", slots);
        printf(" Source nodes:\n");
        listRewind(sources, &li);
        while ((ln = listNext(&li)) != NULL) {
        clusterManagerNode *src = ln->value;
        sds info = clusterManagerNodeInfo(src, 4);
        printf("%s\n", info);
        sdsfree(info);
        }
        printf(" Destination node:\n");
        sds info = clusterManagerNodeInfo(target, 4);
        printf("%s\n", info);
        sdsfree(info);
        table = clusterManagerComputeReshardTable(sources, slots);
        printf(" Resharding plan:\n");
        clusterManagerShowReshardTable(table);
        if (!(config.cluster_manager_command.flags &
        CLUSTER_MANAGER_CMD_FLAG_YES))
        {
        printf("Do you want to proceed with the proposed "
        "reshard plan (yes/no)? ");
        fflush(stdout);
        char buf[4];
        int nread = read(fileno(stdin),buf,4);
        buf[3] = '\0';
        if (nread <= 0 || strcmp("yes", buf) != 0) {
        result = 0;
        goto cleanup;
        }
        }
        int opts = CLUSTER_MANAGER_OPT_VERBOSE;
        listRewind(table, &li);
        while ((ln = listNext(&li)) != NULL) {
        clusterManagerReshardTableItem *item = ln->value;
        char *err = NULL;
        result = clusterManagerMoveSlot(item->source, target, item->slot,
        opts, &err);
        if (!result) {
        if (err != NULL) {
        //clusterManagerLogErr("\n%s\n", err);
        zfree(err);
        }
        goto cleanup;
        }
        }
        cleanup:
        listRelease(sources);
        clusterManagerReleaseReshardTable(table);
        return result;
        invalid_args:
        fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG);
        return 0;
        }

        重新分片从 clusterManagerCommandReshard() 函数开始执行。

        6 〜 13 行获取源节点信息,并打印。

        14 〜 17 行:获取目的节点信息,并打印。

        18 〜 20 行:制定分片计划,并打印。

        21 〜 34 行:输入 yes ,确认分片计划。

        36 〜 49 行:执行分片计划。


        下面,我们对其中的重点内容进行分析:


        制定分片计划

          static list *clusterManagerComputeReshardTable(list *sources, int numslots) {
          list *moved = listCreate();
          int src_count = listLength(sources), i = 0, tot_slots = 0, j;
          clusterManagerNode **sorted = zmalloc(src_count * sizeof(*sorted));
          listIter li;
          listNode *ln;
          listRewind(sources, &li);
          while ((ln = listNext(&li)) != NULL) {
          clusterManagerNode *node = ln->value;
          tot_slots += node->slots_count;
          sorted[i++] = node;
          }
          qsort(sorted, src_count, sizeof(clusterManagerNode *),
          clusterManagerSlotCountCompareDesc);
          for (i = 0; i < src_count; i++) {
          clusterManagerNode *node = sorted[i];
          float n = ((float) numslots tot_slots * node->slots_count);
          if (i == 0) n = ceil(n);
          else n = floor(n);
          int max = (int) n, count = 0;
          for (j = 0; j < CLUSTER_MANAGER_SLOTS; j++) {
          int slot = node->slots[j];
          if (!slot) continue;
          if (count >= max || (int)listLength(moved) >= numslots) break;
          clusterManagerReshardTableItem *item = zmalloc(sizeof(*item));
          item->source = node;
          item->slot = j;
          listAddNodeTail(moved, item);
          count++;
          }
          }
          zfree(sorted);
          return moved;
          }

          先计算源节点共管理多少个槽(tot_slots),再对源节点排序,然后计算出每个源节点需要移出多少个槽(n),从各个源节点所管理的槽中取出前 n 个槽,记录每个源节点及其要移出的槽的编号,就是分片计划(moved)。


          槽迁移

            /* Move slots between source and target nodes using MIGRATE.
            *
            * Options:
            * CLUSTER_MANAGER_OPT_VERBOSE -- Print a dot for every moved key.
            * CLUSTER_MANAGER_OPT_COLD -- Move keys without opening slots
            * reconfiguring the nodes.
            * CLUSTER_MANAGER_OPT_UPDATE -- Update node->slots for source/target nodes.
            * CLUSTER_MANAGER_OPT_QUIET -- Don't print info messages.
            */
            static int clusterManagerMoveSlot(clusterManagerNode *source,
            clusterManagerNode *target,
            int slot, int opts, char**err)
            {
            if (!(opts & CLUSTER_MANAGER_OPT_QUIET)) {
            printf("Moving slot %d from %s:%d to %s:%d: ", slot, source->ip,
            source->port, target->ip, target->port);
            fflush(stdout);
            }
            if (err != NULL) *err = NULL;
            int pipeline = config.cluster_manager_command.pipeline,
            timeout = config.cluster_manager_command.timeout,
            print_dots = (opts & CLUSTER_MANAGER_OPT_VERBOSE),
            option_cold = (opts & CLUSTER_MANAGER_OPT_COLD),
            success = 1;
            if (!option_cold) {
            success = clusterManagerSetSlot(target, source, slot,
            "importing", err);
            if (!success) return 0;
            success = clusterManagerSetSlot(source, target, slot,
            "migrating", err);
            if (!success) return 0;
            }
            success = clusterManagerMigrateKeysInSlot(source, target, slot, timeout,
            pipeline, print_dots, err);
            if (!(opts & CLUSTER_MANAGER_OPT_QUIET)) printf("\n");
            if (!success) return 0;
            /* Set the new node as the owner of the slot in all the known nodes. */
            if (!option_cold) {
            listIter li;
            listNode *ln;
            listRewind(cluster_manager.nodes, &li);
            while ((ln = listNext(&li)) != NULL) {
            clusterManagerNode *n = ln->value;
            if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
            redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER "
            "SETSLOT %d %s %s",
            slot, "node",
            target->name);
            success = (r != NULL);
            if (!success) return 0;
            if (r->type == REDIS_REPLY_ERROR) {
            success = 0;
            if (err != NULL) {
            *err = zmalloc((r->len + 1) * sizeof(char));
            strcpy(*err, r->str);
            CLUSTER_MANAGER_PRINT_REPLY_ERROR(n, *err);
            }
            }
            freeReplyObject(r);
            if (!success) return 0;
            }
            }
            /* Update the node logical config */
            if (opts & CLUSTER_MANAGER_OPT_UPDATE) {
            source->slots[slot] = 0;
            target->slots[slot] = 1;
            }
            return 1;
            }

            槽迁移分为两部分,一个是数据迁移,一个是槽的绑定。

            clusterManagerMigrateKeysInSlot() 函数将指定槽中的数据从源节点迁移到目标节点。

            CLUSTER SETSLOT 命令绑定槽到指定的节点。


              /* Migrate all keys in the given slot from source to target.*/
              static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
              clusterManagerNode *target,
              int slot, int timeout,
              int pipeline, int verbose,
              char **err)
              {
              int success = 1;
              int replace_existing_keys = (config.cluster_manager_command.flags &
              (CLUSTER_MANAGER_CMD_FLAG_FIX | CLUSTER_MANAGER_CMD_FLAG_REPLACE));
              while (1) {
              char *dots = NULL;
              redisReply *reply = NULL, *migrate_reply = NULL;
              reply = CLUSTER_MANAGER_COMMAND(source, "CLUSTER "
              "GETKEYSINSLOT %d %d", slot,
              pipeline);
              success = (reply != NULL);
              if (!success) return 0;
              if (reply->type == REDIS_REPLY_ERROR) {
              success = 0;
              if (err != NULL) {
              *err = zmalloc((reply->len + 1) * sizeof(char));
              strcpy(*err, reply->str);
              CLUSTER_MANAGER_PRINT_REPLY_ERROR(source, *err);
              }
              goto next;
              }
              assert(reply->type == REDIS_REPLY_ARRAY);
              size_t count = reply->elements;
              if (count == 0) {
              freeReplyObject(reply);
              break;
              }
              if (verbose) dots = zmalloc((count+1) * sizeof(char));
              /* Calling MIGRATE command. */
              migrate_reply = clusterManagerMigrateKeysInReply(source, target,
              reply, 0, timeout,
              dots);
              if (migrate_reply == NULL) goto next;
              if (migrate_reply->type == REDIS_REPLY_ERROR) {
              int is_busy = strstr(migrate_reply->str, "BUSYKEY") != NULL;
              int not_served = strstr(migrate_reply->str, "slot not served") != NULL;
              if (replace_existing_keys && (is_busy || not_served)) {
              /* If the key already exists, try to migrate keys
              * adding REPLACE option.
              * If the key's slot is not served, try to assign slot
              * to the target node. */
              if (not_served)
              clusterManagerSetSlot(source, target, slot, "node", NULL);
              clusterManagerLogWarn("*** Target key exists. "
              "Replacing it for FIX.\n");
              freeReplyObject(migrate_reply);
              migrate_reply = clusterManagerMigrateKeysInReply(source,
              target,
              reply,
              is_busy,
              timeout,
              NULL);
              success = (migrate_reply != NULL &&
              migrate_reply->type != REDIS_REPLY_ERROR);
              } else success = 0;
              if (!success) {
              if (migrate_reply != NULL) {
              if (err) {
              *err = zmalloc((migrate_reply->len + 1) * sizeof(char));
              strcpy(*err, migrate_reply->str);
              }
              printf("\n");
              CLUSTER_MANAGER_PRINT_REPLY_ERROR(source,
              migrate_reply->str);
              }
              goto next;
              }
              }
              if (verbose) {
              printf("%s", dots);
              fflush(stdout);
              }
              next:
              if (reply != NULL) freeReplyObject(reply);
              if (migrate_reply != NULL) freeReplyObject(migrate_reply);
              if (dots) zfree(dots);
              if (!success) break;
              }
              return success;
              }

              数据迁移。通过 CLUSTER GETKEYSINSLOT 命令获取指定槽中的 key,然后通过 MIGRATE 命令将数据从源节点迁移到目标节点。

              clusterManagerMigrateKeysInReply() 函数根据参数生成 MIGRATE 命令。


              MIGRATE 命令并非集群管理特有的命令,这里我们不分析其具体实现,简单介绍一下 MIGRATE 命令的使用:

                MIGRATE host port key|"" destination-db timeout [COPY] [REPLACE] [AUTH password] [KEYS key [key ...]]

                Atomically transfer a key from a source Redis instance to a destination Redis instance. On success the key is deleted from the original instance and is guaranteed to exist in the target instance.


                SETSLOT 命令实现

                  else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
                      /* SETSLOT 10 MIGRATING <node ID> */
                      /* SETSLOT 10 IMPORTING <node ID> */
                      /* SETSLOT 10 STABLE */
                      /* SETSLOT 10 NODE <node ID> */
                      int slot;
                      clusterNode *n;


                      if (nodeIsSlave(myself)) {
                          addReplyError(c,"Please use SETSLOT only with masters.");
                          return;
                      }


                      if ((slot = getSlotOrReply(c,c->argv[2])) == -1return;


                      if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
                          if (server.cluster->slots[slot] != myself) {
                              addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
                              return;
                          }
                          if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
                              addReplyErrorFormat(c,"I don't know about node %s",
                                  (char*)c->argv[4]->ptr);
                              return;
                          }
                          server.cluster->migrating_slots_to[slot] = n;
                      } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
                          if (server.cluster->slots[slot] == myself) {
                              addReplyErrorFormat(c,
                                  "I'm already the owner of hash slot %u",slot);
                              return;
                          }
                          if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
                              addReplyErrorFormat(c,"I don't know about node %s",
                                  (char*)c->argv[4]->ptr);
                              return;
                          }
                          server.cluster->importing_slots_from[slot] = n;
                      } else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) {
                          /* CLUSTER SETSLOT <SLOT> STABLE */
                          server.cluster->importing_slots_from[slot] = NULL;
                          server.cluster->migrating_slots_to[slot] = NULL;
                      } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
                          /* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */
                          clusterNode *n = clusterLookupNode(c->argv[4]->ptr);


                          if (!n) {
                              addReplyErrorFormat(c,"Unknown node %s",
                                  (char*)c->argv[4]->ptr);
                              return;
                          }
                          /* If this hash slot was served by 'myself' before to switch
                           * make sure there are no longer local keys for this hash slot. */
                          if (server.cluster->slots[slot] == myself && n != myself) {
                              if (countKeysInSlot(slot) != 0) {
                                  addReplyErrorFormat(c,
                                      "Can't assign hashslot %d to a different node "
                                      "while I still hold keys for this hash slot.", slot);
                                  return;
                              }
                          }
                          /* If this slot is in migrating status but we have no keys
                           * for it assigning the slot to another node will clear
                           * the migratig status. */
                          if (countKeysInSlot(slot) == 0 &&
                              server.cluster->migrating_slots_to[slot])
                              server.cluster->migrating_slots_to[slot] = NULL;


                          /* If this node was importing this slot, assigning the slot to
                           * itself also clears the importing status. */
                          if (n == myself &&
                              server.cluster->importing_slots_from[slot])
                          {
                              /* This slot was manually migrated, set this node configEpoch
                               * to a new epoch so that the new version can be propagated
                               * by the cluster.
                               *
                               * Note that if this ever results in a collision with another
                               * node getting the same configEpoch, for example because a
                               * failover happens at the same time we close the slot, the
                               * configEpoch collision resolution will fix it assigning
                               * a different epoch to each node. */
                              if (clusterBumpConfigEpochWithoutConsensus() == C_OK) {
                                  serverLog(LL_WARNING,
                                      "configEpoch updated after importing slot %d", slot);
                              }
                              server.cluster->importing_slots_from[slot] = NULL;
                          }
                          clusterDelSlot(slot);
                          clusterAddSlot(n,slot);
                      } else {
                          addReplyError(c,
                              "Invalid CLUSTER SETSLOT action or number of arguments. Try CLUSTER HELP");
                          return;
                      }
                      clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);
                      addReply(c,shared.ok);
                  }
                    clusterDelSlot(slot);
                    clusterAddSlot(n,slot);
                    clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);

                    绑定到指定的节点上,然后保存集群配置。


                    总结

                    重新分片主要分为两部分:制定分片计划、槽迁移。槽迁移又分为数据迁移和槽的绑定数据迁移通过 MIGRATE 命令实现,槽的绑定通过 CLUSTER SETSLOT 命令实现。

                    文章转载自谷竹,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                    评论