这篇文章,我们将介绍重新分片是如何实现的。重新分片就是把现有主节点所管理的槽进行重新划分。
参数
--cluster-slots:要划分的槽数
--cluster-from:源节点
--cluster-to:目标节点
重新分片就是把源节点所管理的槽划分给目标节点,如果 --cluster-from 参数为 all,表示所有主节点都参与划分,否则,为要参与划分的主节点ID
如果 --cluster-from 参数在命令行中指定,那么多个主节点以“,”分隔
示例
redis-cli --cluster reshard 127.0.0.1:7000redis-cli --cluster reshard 127.0.0.1:7000 --cluster-slots 1000 --cluster-from all --cluster-to 4df14f590f0aed549894fda17b36d90d42f755ee
示例1,根据提示输入相关参数;
示例2,直接在命令行中指定相关参数。
分片计划
在执行分片计划之前,reshard 操作会显示分片计划信息,然后输入“yes”确认。
Ready to move 1000 slots.Source nodes:M: ee0500d25b798768e7db2cc350110e6c7ab612b5 127.0.0.1:7000slots:[0-5460] (5461 slots) master1 additional replica(s)M: 0bd81e2d29bd2a7ea445a3809b44802df498d49e 127.0.0.1:7001slots:[5461-10922] (5462 slots) master1 additional replica(s)M: 19696e34600c1023350c860b1ba3c842a70a65d2 127.0.0.1:7002slots:[10923-16383] (5461 slots) master1 additional replica(s)Destination node:M: 4df14f590f0aed549894fda17b36d90d42f755ee 127.0.0.1:7006slots: (0 slots) masterResharding plan:Moving slot 0 from ee0500d25b798768e7db2cc350110e6c7ab612b5Moving slot 1 from ee0500d25b798768e7db2cc350110e6c7ab612b5Moving slot 2 from ee0500d25b798768e7db2cc350110e6c7ab612b5Moving slot 3 from ee0500d25b798768e7db2cc350110e6c7ab612b5Moving slot 4 from ee0500d25b798768e7db2cc350110e6c7ab612b5Moving slot 5 from ee0500d25b798768e7db2cc350110e6c7ab612b5
声明:由于函数前半部分主要处理输入参数相关,且代码行数较长,为了节省文章篇幅,故省略。
static int clusterManagerCommandReshard(int argc, char **argv) {******************** 参数相关 省略 ********************printf("\nReady to move %d slots.\n", slots);printf(" Source nodes:\n");listRewind(sources, &li);while ((ln = listNext(&li)) != NULL) {clusterManagerNode *src = ln->value;sds info = clusterManagerNodeInfo(src, 4);printf("%s\n", info);sdsfree(info);}printf(" Destination node:\n");sds info = clusterManagerNodeInfo(target, 4);printf("%s\n", info);sdsfree(info);table = clusterManagerComputeReshardTable(sources, slots);printf(" Resharding plan:\n");clusterManagerShowReshardTable(table);if (!(config.cluster_manager_command.flags &CLUSTER_MANAGER_CMD_FLAG_YES)){printf("Do you want to proceed with the proposed ""reshard plan (yes/no)? ");fflush(stdout);char buf[4];int nread = read(fileno(stdin),buf,4);buf[3] = '\0';if (nread <= 0 || strcmp("yes", buf) != 0) {result = 0;goto cleanup;}}int opts = CLUSTER_MANAGER_OPT_VERBOSE;listRewind(table, &li);while ((ln = listNext(&li)) != NULL) {clusterManagerReshardTableItem *item = ln->value;char *err = NULL;result = clusterManagerMoveSlot(item->source, target, item->slot,opts, &err);if (!result) {if (err != NULL) {//clusterManagerLogErr("\n%s\n", err);zfree(err);}goto cleanup;}}cleanup:listRelease(sources);clusterManagerReleaseReshardTable(table);return result;invalid_args:fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG);return 0;}
重新分片从 clusterManagerCommandReshard() 函数开始执行。
6 〜 13 行:获取源节点信息,并打印。
14 〜 17 行:获取目的节点信息,并打印。
18 〜 20 行:制定分片计划,并打印。
21 〜 34 行:输入 yes ,确认分片计划。
36 〜 49 行:执行分片计划。
下面,我们对其中的重点内容进行分析:
制定分片计划
static list *clusterManagerComputeReshardTable(list *sources, int numslots) {list *moved = listCreate();int src_count = listLength(sources), i = 0, tot_slots = 0, j;clusterManagerNode **sorted = zmalloc(src_count * sizeof(*sorted));listIter li;listNode *ln;listRewind(sources, &li);while ((ln = listNext(&li)) != NULL) {clusterManagerNode *node = ln->value;tot_slots += node->slots_count;sorted[i++] = node;}qsort(sorted, src_count, sizeof(clusterManagerNode *),clusterManagerSlotCountCompareDesc);for (i = 0; i < src_count; i++) {clusterManagerNode *node = sorted[i];float n = ((float) numslots tot_slots * node->slots_count);if (i == 0) n = ceil(n);else n = floor(n);int max = (int) n, count = 0;for (j = 0; j < CLUSTER_MANAGER_SLOTS; j++) {int slot = node->slots[j];if (!slot) continue;if (count >= max || (int)listLength(moved) >= numslots) break;clusterManagerReshardTableItem *item = zmalloc(sizeof(*item));item->source = node;item->slot = j;listAddNodeTail(moved, item);count++;}}zfree(sorted);return moved;}
先计算源节点共管理多少个槽(tot_slots),再对源节点排序,然后计算出每个源节点需要移出多少个槽(n),从各个源节点所管理的槽中取出前 n 个槽,记录每个源节点及其要移出的槽的编号,就是分片计划(moved)。
槽迁移
/* Move slots between source and target nodes using MIGRATE.** Options:* CLUSTER_MANAGER_OPT_VERBOSE -- Print a dot for every moved key.* CLUSTER_MANAGER_OPT_COLD -- Move keys without opening slots* reconfiguring the nodes.* CLUSTER_MANAGER_OPT_UPDATE -- Update node->slots for source/target nodes.* CLUSTER_MANAGER_OPT_QUIET -- Don't print info messages.*/static int clusterManagerMoveSlot(clusterManagerNode *source,clusterManagerNode *target,int slot, int opts, char**err){if (!(opts & CLUSTER_MANAGER_OPT_QUIET)) {printf("Moving slot %d from %s:%d to %s:%d: ", slot, source->ip,source->port, target->ip, target->port);fflush(stdout);}if (err != NULL) *err = NULL;int pipeline = config.cluster_manager_command.pipeline,timeout = config.cluster_manager_command.timeout,print_dots = (opts & CLUSTER_MANAGER_OPT_VERBOSE),option_cold = (opts & CLUSTER_MANAGER_OPT_COLD),success = 1;if (!option_cold) {success = clusterManagerSetSlot(target, source, slot,"importing", err);if (!success) return 0;success = clusterManagerSetSlot(source, target, slot,"migrating", err);if (!success) return 0;}success = clusterManagerMigrateKeysInSlot(source, target, slot, timeout,pipeline, print_dots, err);if (!(opts & CLUSTER_MANAGER_OPT_QUIET)) printf("\n");if (!success) return 0;/* Set the new node as the owner of the slot in all the known nodes. */if (!option_cold) {listIter li;listNode *ln;listRewind(cluster_manager.nodes, &li);while ((ln = listNext(&li)) != NULL) {clusterManagerNode *n = ln->value;if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER ""SETSLOT %d %s %s",slot, "node",target->name);success = (r != NULL);if (!success) return 0;if (r->type == REDIS_REPLY_ERROR) {success = 0;if (err != NULL) {*err = zmalloc((r->len + 1) * sizeof(char));strcpy(*err, r->str);CLUSTER_MANAGER_PRINT_REPLY_ERROR(n, *err);}}freeReplyObject(r);if (!success) return 0;}}/* Update the node logical config */if (opts & CLUSTER_MANAGER_OPT_UPDATE) {source->slots[slot] = 0;target->slots[slot] = 1;}return 1;}
槽迁移分为两部分,一个是数据迁移,一个是槽的绑定。
clusterManagerMigrateKeysInSlot() 函数将指定槽中的数据从源节点迁移到目标节点。
CLUSTER SETSLOT 命令绑定槽到指定的节点。
/* Migrate all keys in the given slot from source to target.*/static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,clusterManagerNode *target,int slot, int timeout,int pipeline, int verbose,char **err){int success = 1;int replace_existing_keys = (config.cluster_manager_command.flags &(CLUSTER_MANAGER_CMD_FLAG_FIX | CLUSTER_MANAGER_CMD_FLAG_REPLACE));while (1) {char *dots = NULL;redisReply *reply = NULL, *migrate_reply = NULL;reply = CLUSTER_MANAGER_COMMAND(source, "CLUSTER ""GETKEYSINSLOT %d %d", slot,pipeline);success = (reply != NULL);if (!success) return 0;if (reply->type == REDIS_REPLY_ERROR) {success = 0;if (err != NULL) {*err = zmalloc((reply->len + 1) * sizeof(char));strcpy(*err, reply->str);CLUSTER_MANAGER_PRINT_REPLY_ERROR(source, *err);}goto next;}assert(reply->type == REDIS_REPLY_ARRAY);size_t count = reply->elements;if (count == 0) {freeReplyObject(reply);break;}if (verbose) dots = zmalloc((count+1) * sizeof(char));/* Calling MIGRATE command. */migrate_reply = clusterManagerMigrateKeysInReply(source, target,reply, 0, timeout,dots);if (migrate_reply == NULL) goto next;if (migrate_reply->type == REDIS_REPLY_ERROR) {int is_busy = strstr(migrate_reply->str, "BUSYKEY") != NULL;int not_served = strstr(migrate_reply->str, "slot not served") != NULL;if (replace_existing_keys && (is_busy || not_served)) {/* If the key already exists, try to migrate keys* adding REPLACE option.* If the key's slot is not served, try to assign slot* to the target node. */if (not_served)clusterManagerSetSlot(source, target, slot, "node", NULL);clusterManagerLogWarn("*** Target key exists. ""Replacing it for FIX.\n");freeReplyObject(migrate_reply);migrate_reply = clusterManagerMigrateKeysInReply(source,target,reply,is_busy,timeout,NULL);success = (migrate_reply != NULL &&migrate_reply->type != REDIS_REPLY_ERROR);} else success = 0;if (!success) {if (migrate_reply != NULL) {if (err) {*err = zmalloc((migrate_reply->len + 1) * sizeof(char));strcpy(*err, migrate_reply->str);}printf("\n");CLUSTER_MANAGER_PRINT_REPLY_ERROR(source,migrate_reply->str);}goto next;}}if (verbose) {printf("%s", dots);fflush(stdout);}next:if (reply != NULL) freeReplyObject(reply);if (migrate_reply != NULL) freeReplyObject(migrate_reply);if (dots) zfree(dots);if (!success) break;}return success;}
数据迁移。通过 CLUSTER GETKEYSINSLOT 命令获取指定槽中的 key,然后通过 MIGRATE 命令将数据从源节点迁移到目标节点。
clusterManagerMigrateKeysInReply() 函数根据参数生成 MIGRATE 命令。
MIGRATE 命令并非集群管理特有的命令,这里我们不分析其具体实现,仅简单介绍一下 MIGRATE 命令的使用:
MIGRATE host port key|"" destination-db timeout [COPY] [REPLACE] [AUTH password] [KEYS key [key ...]]
Atomically transfer a key from a source Redis instance to a destination Redis instance. On success the key is deleted from the original instance and is guaranteed to exist in the target instance.
SETSLOT 命令实现
else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {/* SETSLOT 10 MIGRATING <node ID> *//* SETSLOT 10 IMPORTING <node ID> *//* SETSLOT 10 STABLE *//* SETSLOT 10 NODE <node ID> */int slot;clusterNode *n;if (nodeIsSlave(myself)) {addReplyError(c,"Please use SETSLOT only with masters.");return;}if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return;if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {if (server.cluster->slots[slot] != myself) {addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);return;}if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {addReplyErrorFormat(c,"I don't know about node %s",(char*)c->argv[4]->ptr);return;}server.cluster->migrating_slots_to[slot] = n;} else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {if (server.cluster->slots[slot] == myself) {addReplyErrorFormat(c,"I'm already the owner of hash slot %u",slot);return;}if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {addReplyErrorFormat(c,"I don't know about node %s",(char*)c->argv[4]->ptr);return;}server.cluster->importing_slots_from[slot] = n;} else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) {/* CLUSTER SETSLOT <SLOT> STABLE */server.cluster->importing_slots_from[slot] = NULL;server.cluster->migrating_slots_to[slot] = NULL;} else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {/* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */clusterNode *n = clusterLookupNode(c->argv[4]->ptr);if (!n) {addReplyErrorFormat(c,"Unknown node %s",(char*)c->argv[4]->ptr);return;}/* If this hash slot was served by 'myself' before to switch* make sure there are no longer local keys for this hash slot. */if (server.cluster->slots[slot] == myself && n != myself) {if (countKeysInSlot(slot) != 0) {addReplyErrorFormat(c,"Can't assign hashslot %d to a different node ""while I still hold keys for this hash slot.", slot);return;}}/* If this slot is in migrating status but we have no keys* for it assigning the slot to another node will clear* the migratig status. */if (countKeysInSlot(slot) == 0 &&server.cluster->migrating_slots_to[slot])server.cluster->migrating_slots_to[slot] = NULL;/* If this node was importing this slot, assigning the slot to* itself also clears the importing status. */if (n == myself &&server.cluster->importing_slots_from[slot]){/* This slot was manually migrated, set this node configEpoch* to a new epoch so that the new version can be propagated* by the cluster.** Note that if this ever results in a collision with another* node getting the same configEpoch, for example because a* failover happens at the same time we close the slot, the* configEpoch collision resolution will fix it assigning* a different epoch to each node. */if (clusterBumpConfigEpochWithoutConsensus() == C_OK) {serverLog(LL_WARNING,"configEpoch updated after importing slot %d", slot);}server.cluster->importing_slots_from[slot] = NULL;}clusterDelSlot(slot);clusterAddSlot(n,slot);} else {addReplyError(c,"Invalid CLUSTER SETSLOT action or number of arguments. Try CLUSTER HELP");return;}clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);addReply(c,shared.ok);}
clusterDelSlot(slot);clusterAddSlot(n,slot);clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);
将槽绑定到指定的节点上,然后保存集群配置。
总结
重新分片主要分为两部分:制定分片计划、槽迁移。槽迁移又分为数据迁移和槽的绑定,数据迁移通过 MIGRATE 命令实现,槽的绑定通过 CLUSTER SETSLOT 命令实现。




