前面几篇文章,我们介绍了集群管理相关操作是如何实现的。这篇文章,我们介绍数据读写的源码实现。
示例
redis-cli -c -p 7000127.0.0.1:7000> set hello world127.0.0.1:7000> get hello
我们已经知道,数据分布在集群的各个节点上,那么,集群是如何决定数据保存在哪个节点上?又是如何知道去哪个节点上读取数据呢?
/* If cluster is enabled perform the cluster redirection here.* However we don't perform the redirection if:* 1) The sender of this command is our master.* 2) The command has no key arguments. */if (server.cluster_enabled &&!(c->flags & CLIENT_MASTER) &&!(c->flags & CLIENT_LUA &&server.lua_caller->flags & CLIENT_MASTER) &&!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&c->cmd->proc != execCommand)){int hashslot;int error_code;clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);if (n == NULL || n != server.cluster->myself) {if (c->cmd->proc == execCommand) {discardTransaction(c);} else {flagTransaction(c);}clusterRedirectClient(c,n,hashslot,error_code);return C_OK;}}
上面的代码引用自 processCommand() 函数。processCommand() 函数是命令处理函数,也就是执行命令的入口函数。
从上面的代码可以看出,要执行一条命令,先根据命令找到执行命令的节点,如果节点不是当前节点,则重定向到执行命令的节点。
我们先来看 getNodeByQuery() 函数,该函数决定命令在哪个节点上执行。
clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {clusterNode *n = NULL;robj *firstkey = NULL;int multiple_keys = 0;multiState *ms, _ms;multiCmd mc;int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0;* Allow any key to be set if a module disabled cluster redirections. */if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)return myself;* Set error code optimistically for the base case. */if (error_code) *error_code = CLUSTER_REDIR_NONE;* Modules can turn off Redis Cluster redirection: this is useful* when writing a module that implements a completely different* distributed system. */* We handle all the cases as if they were EXEC commands, so we have* a common code path for everything */if (cmd->proc == execCommand) {* If CLIENT_MULTI flag is not set EXEC is just going to return an* error. */if (!(c->flags & CLIENT_MULTI)) return myself;ms = &c->mstate;} else {* In order to have a single codepath create a fake Multi State* structure if the client is not in MULTI/EXEC state, this way* we have a single codepath below. */ms = &_ms;_ms.commands = &mc;_ms.count = 1;mc.argv = argv;mc.argc = argc;mc.cmd = cmd;}* Check that all the keys are in the same hash slot, and obtain this* slot and the node associated. */for (i = 0; i < ms->count; i++) {struct redisCommand *mcmd;robj **margv;int margc, *keyindex, numkeys, j;mcmd = ms->commands[i].cmd;margc = ms->commands[i].argc;margv = ms->commands[i].argv;keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);for (j = 0; j < numkeys; j++) {robj *thiskey = margv[keyindex[j]];int thisslot = keyHashSlot((char*)thiskey->ptr,sdslen(thiskey->ptr));if (firstkey == NULL) {* This is the first key we see. Check what is the slot* and node. */firstkey = thiskey;slot = thisslot;n = server.cluster->slots[slot];* Error: If a slot is not served, we are in "cluster down"* state. However the state is yet to be updated, so this was* not trapped earlier in processCommand(). Report the same* error to the client. */if (n == NULL) {getKeysFreeResult(keyindex);if (error_code)*error_code = CLUSTER_REDIR_DOWN_UNBOUND;return NULL;}* If we are migrating or importing this slot, we need to check* if we have all the keys in the request (the only way we* can safely serve the request, otherwise we return a TRYAGAIN* error). To do so we set the importing/migrating state and* increment a counter for every missing key. */if (n == myself &&server.cluster->migrating_slots_to[slot] != NULL){migrating_slot = 1;} else if (server.cluster->importing_slots_from[slot] != NULL) {importing_slot = 1;}} else {* If it is not the first key, make sure it is exactly* the same key as the first we saw. */if (!equalStringObjects(firstkey,thiskey)) {if (slot != thisslot) {* Error: multiple keys from different slots. */getKeysFreeResult(keyindex);if (error_code)*error_code = CLUSTER_REDIR_CROSS_SLOT;return NULL;} else {* Flag this request as one with multiple different* keys. */multiple_keys = 1;}}}* Migarting Improrting slot? Count keys we don't have. */if ((migrating_slot || importing_slot) &&lookupKeyRead(&server.db[0],thiskey) == NULL){missing_keys++;}}getKeysFreeResult(keyindex);}* No key at all in command? then we can serve the request* without redirections or errors in all the cases. */if (n == NULL) return myself;* Cluster is globally down but we got keys? We can't serve the request. */if (server.cluster->state != CLUSTER_OK) {if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;return NULL;}/* Return the hashslot by reference. */if (hashslot) *hashslot = slot;/* MIGRATE always works in the context of the local node if the slot* is open (migrating or importing state). We need to be able to freely* move keys among instances in this case. */if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand)return myself;/* If we don't have all the keys and we are migrating the slot, send* an ASK redirection. */if (migrating_slot && missing_keys) {if (error_code) *error_code = CLUSTER_REDIR_ASK;return server.cluster->migrating_slots_to[slot];}/* If we are receiving the slot, and the client correctly flagged the* request as "ASKING", we can serve the request. However if the request* involves multiple keys and we don't have them all, the only option is* to send a TRYAGAIN error. */if (importing_slot &&(c->flags & CLIENT_ASKING || cmd->flags & CMD_ASKING)){if (multiple_keys && missing_keys) {if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;return NULL;} else {return myself;}}/* Handle the read-only client case reading from a slave: if this* node is a slave and the request is about an hash slot our master* is serving, we can reply without redirection. */if (c->flags & CLIENT_READONLY &&(cmd->flags & CMD_READONLY || cmd->proc == evalCommand ||cmd->proc == evalShaCommand) &&nodeIsSlave(myself) &&myself->slaveof == n){return myself;}/* Base case: just return the right node. However if this node is not* myself, set error_code to MOVED since we need to issue a rediretion. */if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED;return n;}
该函数较长,但我们的重点是:该函数是如何决定命令在哪个节点上执行的?
robj *thiskey = margv[keyindex[j]];int thisslot = keyHashSlot((char*)thiskey->ptr,sdslen(thiskey->ptr));
key 经过哈希计算得到所在的槽。
slot = thisslot;n = server.cluster->slots[slot];
查找管理该槽的节点。
/* Base case: just return the right node. However if this node is not* myself, set error_code to MOVED since we need to issue a rediretion. */if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED;return n;
返回该节点,如果该节点不是当前节点,则设置错误码为:
CLUSTER_REDIR_MOVED。
/* If it is not the first key, make sure it is exactly* the same key as the first we saw. */if (!equalStringObjects(firstkey,thiskey)) {if (slot != thisslot) {/* Error: multiple keys from different slots. */getKeysFreeResult(keyindex);if (error_code)*error_code = CLUSTER_REDIR_CROSS_SLOT;return NULL;} else {/* Flag this request as one with multiple different* keys. */multiple_keys = 1;}}
注意:如果命令中包含多个 key,则这些 key 必须在同一个槽中,否则,命令不能执行。
/* No key at all in command? then we can serve the request* without redirections or errors in all the cases. */if (n == NULL) return myself;
当然,如果命令中没有 key,则命令在当前节点即可执行。
接下来,再看 clusterRedirectClient() 函数,该函数返回重定向信息。
void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) {if (error_code == CLUSTER_REDIR_CROSS_SLOT) {addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));} else if (error_code == CLUSTER_REDIR_UNSTABLE) {/* The request spawns multiple keys in the same slot,* but the slot is not "stable" currently as there is* a migration or import in progress. */addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));} else if (error_code == CLUSTER_REDIR_DOWN_STATE) {addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down\r\n"));} else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) {addReplySds(c,sdsnew("-CLUSTERDOWN Hash slot not served\r\n"));} else if (error_code == CLUSTER_REDIR_MOVED ||error_code == CLUSTER_REDIR_ASK){addReplySds(c,sdscatprintf(sdsempty(),"-%s %d %s:%d\r\n",(error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",hashslot,n->ip,n->port));} else {serverPanic("getNodeByQuery() unknown error.");}}
重定向信息包含:"MOVED"、slot、ip、port。
返回重定向信息给客户端之后,客户端又是如何处理的呢?
/* Check if we need to connect to a different node and reissue the* request. */if (config.cluster_mode && reply->type == REDIS_REPLY_ERROR &&(!strncmp(reply->str,"MOVED",5) || !strcmp(reply->str,"ASK"))){char *p = reply->str, *s;int slot;output = 0;/* Comments show the position of the pointer as:** [S] for pointer 's'* [P] for pointer 'p'*/s = strchr(p,' '); /* MOVED[S]3999 127.0.0.1:6381 */p = strchr(s+1,' '); /* MOVED[S]3999[P]127.0.0.1:6381 */*p = '\0';slot = atoi(s+1);s = strrchr(p+1,':'); /* MOVED 3999[P]127.0.0.1[S]6381 */*s = '\0';sdsfree(config.hostip);config.hostip = sdsnew(p+1);config.hostport = atoi(s+1);if (config.interactive)printf("-> Redirected to slot [%d] located at %s:%d\n",slot, config.hostip, config.hostport);config.cluster_reissue_command = 1;cliRefreshPrompt();}
上面的代码引用自 cliReadReply() 函数。
从上面的代码可以看出,当响应中包含 "MOVED" 字符串时,设置客户端为重新发送命令状态:config.cluster_reissue_command = 1
static int issueCommandRepeat(int argc, char **argv, long repeat) {while (1) {config.cluster_reissue_command = 0;if (cliSendCommand(argc,argv,repeat) != REDIS_OK) {cliConnect(CC_FORCE);/* If we still cannot send the command print error.* We'll try to reconnect the next time. */if (cliSendCommand(argc,argv,repeat) != REDIS_OK) {cliPrintContextError();return REDIS_ERR;}}/* Issue the command again if we got redirected in cluster mode */if (config.cluster_mode && config.cluster_reissue_command) {cliConnect(CC_FORCE);} else {break;}}return REDIS_OK;}
/* Issue the command again if we got redirected in cluster mode */if (config.cluster_mode && config.cluster_reissue_command) {cliConnect(CC_FORCE);}
当 config.cluster_reissue_command = 1 时,客户端重新连接到正确的节点,然后发送命令。
总结
当客户端连接到服务端读写数据时,服务端根据 key 来判断数据是否在当前节点,如果不在,则返回重定向信息,客户端根据该信息连接到正确的节点,进行数据读写。




