暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

来聊聊去中心化Redis集群节点如何完成通信

67

写在文章开头

今天我们来聊点有意思的,关于redis中集群间通信的设计与实现,本文将从源码的角度分析redis
集群节点如何利用Gossip
协议完成节点间的通信与传播,希望对你有帮助。

Hi,我是 sharkChili ,是个不断在硬核技术上作死的技术人,是 CSDN的博客专家 ,也是开源项目 Java Guide 的维护者之一,熟悉 Java 也会一点 Go ,偶尔也会在 C源码 边缘徘徊。写过很多有意思的技术博客,也还在研究并输出技术的路上,希望我的文章对你有帮助,非常欢迎你关注我的公众号: 写代码的SharkChili 。


同时也非常欢迎你star我的开源项目mini-redis:https://github.com/shark-ctrl/mini-redis

因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注  “加群”  即可和笔者和笔者的朋友们进行深入交流。

详解Redis集群节点通信的设计与实现

详解Gossip协议

在此之前我们先简单介绍一下Gossip
协议,该协议是分布式集群的一种通信协议,我们都知道管理集群的方式有中心化和去中心化两种方式,中心化的方式是通过第一个第三方的管理中心,例如zookeeper
等来维护一份集群节点的信息、状态:

redis
采用的是去中心化的方式实现集群节点通信,即通过Gossip协议
进行节点通信,让各个节点之间两两通信,广播与自己保持交流的节点,由此将节点串联起来构成一张关系网:

我们以一个简单的场景为例介绍一下Gossip
协议,默认情况下我们的当前有3个节点的集群,各个节点彼此按照通信要求发送自己的信息和与自己保持交流的节点,由此将有限的资源共享出去构成一个集群。

此时,我们需要横向扩展一个节点4,我们只需配置/redis-cli --cluster add-node 新节点IP:新节点端口 任意存活节点IP:任意存活节点端口
,这个存活节点后续和其他节点通信时,就会将当前新添加的节点4发送出去,由此其他节点收到这个消息并存储下来,经过各个节点的不断反复通信,这个集群中的各个节点就会拥有集群中所有节点的信息。

集群消息协定

任何通信都是需要按照协议规范进行,redis
集群也一样,为了保证节点间通信的规范,redis
要求集群节点通信的消息的类型可以是以下几种:

  1. ping
    消息,用来向其他节点发送节点信息。
  2. 回复ping
    pong
    消息。
  3. 如果当前节点中存在新添加的节点,则通过meet
    格式的消息发送给其他节点。
  4. 如果节点出现故障,则发送fail
    消息告知集群其他节点。

对此我们给出消息的宏定义代码,位于cluster.h
中:

//集群中的ping
#define CLUSTERMSG_TYPE_PING 0          /* Ping */
//集群中的pong
#define CLUSTERMSG_TYPE_PONG 1          /* Pong (reply to Ping) */
//想加入集群的节点
#define CLUSTERMSG_TYPE_MEET 2          /* Meet "let's join" message */
//某个节点有故障
#define CLUSTERMSG_TYPE_FAIL 3          /* Mark node xxx as failing */

集群节点消息体

后续集群都会通过clusterMsg
来表示一条消息,它记录消息长度以及发送节点名称、负责的slots
以及节点端口号等信息:

typedef struct {
    char sig[4];       
    //消息总长度
    uint32_t totlen;  
   //......
    //消息类型
    uint16_t type;     
    //......
    //发送节点的名称
    char sender[REDIS_CLUSTER_NAMELEN]; 
    //发送节点负责的slots
    unsigned char myslots[REDIS_CLUSTER_SLOTS/8];
    //......
    char notused1[32];  
    //节点端口
    uint16_t port;     
    //......
    //记录消息的消息体
    union clusterMsgData data;
} clusterMsg;

这里我们对这个消息体clusterMsgData
进行展开说明一下,可以看到他用一段共用体维护各种类型消息的结构,这其中我们只需要了解的是ping
消息,从注释可以看到ping
消息这个结构体可以发送ping
meet
pong
等类型消息,ping
消息类型其内部用clusterMsgDataGossip
数组维护,这一点这个消息可以包含多个节点信息存于数组中:

union clusterMsgData {
   //可以发送ping meet pong的消息,该结构体内部有clusterMsgDataGossip数组,这意味这个结构体可以存放多个节点的消息
    struct {
        /* Array of N clusterMsgDataGossip structures */
        clusterMsgDataGossip gossip[1];
    } ping;

    //......
};

步入clusterMsgDataGossip
即可看到这个结构体存储的是需要发送给它人的节点名称、ping
和收到ping
的时间以及端口号等信息:

typedef struct {
    char nodename[REDIS_CLUSTER_NAMELEN];//节点名称
    uint32_t ping_sent; //发送ping的事件
    uint32_t pong_received;//收到pong的事件
    char ip[REDIS_IP_STR_LEN];  //广播的节点ip
    uint16_t port;          //节点与客户端进行通信的端口
    //......
} clusterMsgDataGossip;

我们来简单小结一下,假设我们的某个节点向其他节点发送ping
消息告知自己维护的节点信息和状态,那么对应的消息格式大体如下图所示:

详解集群节点ping流程

集群节点的指向流程也是交由redis
的时间事件serverCron
执行,它会每个100ms
执行一次集群的定任务clusterCron
方法,其内部会检查这个定时任务是否执行了10次,一旦执行10次(也就是100ms*10即每1秒)后就会随机从当前节点维护的其他节点信息字典表中抽取5个节点,找到最早回复pong
给当前节点发送一条ping
消息:

对此我们给出定时执行的serverCron
函数,可以看到其内部每100ms
执行一次集群定时任务clusterCron

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    //......
    //100ms执行一次集群的函数 
    run_with_period(100) {
        if (server.cluster_enabled) clusterCron();
    }
 //......
}

我们步入clusterCron
即可看到,该定时任务会随机抽取5个节点然后找到最早给该节点发送pong的节点发送ping消息包:

void clusterCron(void) {
   //......
    // 每10次即每过去1s执行一次这段逻辑
    if (!(iteration % 10)) {
        int j;

       
        //随机选出5个节点
        for (j = 0; j < 5; j++) {
            de = dictGetRandomKey(server.cluster->nodes);
            clusterNode *this = dictGetVal(de);

            /* Don't ping nodes disconnected or with a ping currently active. */
            //断连、或者自己、或者正在握手的节点不处理
            if (this->link == NULL || this->ping_sent != 0continue;
            if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE))
                continue;
            //选择最早收到pong的节点    
            if (min_pong_node == NULL || min_pong > this->pong_received) {
                min_pong_node = this;
                min_pong = this->pong_received;
            }
        }
        //向最早收到pong的调用clusterSendPing发送消息
        if (min_pong_node) {
            redisLog(REDIS_DEBUG,"Pinging node %.40s", min_pong_node->name);
            clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
        }
    }

   //......
}

步入clusterSendPing
即可看到我们所说的核心逻辑,即按照公式计算出要发送给最早回复pong
的节点对应节点数,然后封装成消息发送出去:

void clusterSendPing(clusterLink *link, int type) {
    //......
    //我们希望添加的最大节点数,集群总是减去自己和正在握手的
    int freshnodes = dictSize(server.cluster->nodes)-2;

      //......
    //计算wanted
    wanted = floor(dictSize(server.cluster->nodes)/10);
    if (wanted < 3) wanted = 3;
    if (wanted > freshnodes) wanted = freshnodes;

      //......

    /* Populate the header. */
    //设置ping消息头,构建端口号、slot等信息
    if (link->node && type == CLUSTERMSG_TYPE_PING)
        link->node->ping_sent = mstime();
    clusterBuildMessageHdr(hdr,type);

    /* Populate the gossip fields */
    int maxiterations = wanted*3;
    //基于maxiterations进行循环随机抽取自己维护的节点信息并组装
    while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
        dictEntry *de = dictGetRandomKey(server.cluster->nodes);
        clusterNode *this = dictGetVal(de);
        clusterMsgDataGossip *gossip;
        int j;

       //如果是自己则跳过
        if (this == myself) continue;

       //故障节点不发送
        if (maxiterations > wanted*2 &&
            !(this->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL)))
            continue;

        //....

       
        freshnodes--;
        
        //组装当前节点的名称、ip、端口等信息存到hdr所指向的消息结构体
        
        //指向gossip某个索引位置设置名称、ip、端口等
        gossip = &(hdr->data.ping.gossip[gossipcount]);
        memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN);
        gossip->ping_sent = htonl(this->ping_sent);
        gossip->pong_received = htonl(this->pong_received);
        memcpy(gossip->ip,this->ip,sizeof(this->ip));
        gossip->port = htons(this->port);
        gossip->flags = htons(this->flags);
        gossip->notused1 = 0;
        gossip->notused2 = 0;
        gossipcount++;
    }

     //......
     //创建一个发送事件提交给redis发送出去
    clusterSendMessage(link,buf,totlen);
    zfree(buf);
}

等待pong消息回复并解析

每个集群的节点都会定时检查和对端链接的连接是否断开,如果断开的尝试异步非阻塞向其发送建立连接请求,并注册一个处理器clusterReadHandler
处理对端的ping
等消息,所以我们上文的ping
消息实际上就是通过这个函数进行解析读取:

对此我们给出这段源码的入口即可集群的定时任务clusterCron
方法,可以看到其内部会便利当前节点通信的节点,查看连接是否为空,若为空则发起连接并注册clusterReadHandler
处理消息:

void clusterCron(void) {
    //......

    
    di = dictGetSafeIterator(server.cluster->nodes);
    //遍历与当前节点保持通信的节点
    while((de = dictNext(di)) != NULL) {
        clusterNode *node = dictGetVal(de);

        //如果连接为空则非阻塞发起连接,然后注册clusterReadHandler处理对端节点的消息
        if (node->link == NULL) {
            int fd;
            mstime_t old_ping_sent;
            clusterLink *link;

            fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
                node->port+REDIS_CLUSTER_PORT_INCR, REDIS_BIND_ADDR);
            //......
            //创建链接对应存储数据的空间
            link = createClusterLink(node);
            link->fd = fd;
            node->link = link;
            //为这个链接注册clusterReadHandler处理发送的消息
            aeCreateFileEvent(server.el,link->fd,AE_READABLE,
                    clusterReadHandler,link);
            //......
        }
    }
    
}

步入clusterReadHandler
即可看到redis
服务端解析消息存储到buf
并通过clusterProcessPacket
解析的逻辑:

void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    //......

    while(1) { /* Read as long as there is data to read. */
       //......
       //hdr指向link->rcvbuf
       hdr = (clusterMsg*) link->rcvbuf;
        //读取消息到buf即link->rcvbuf中
        nread = read(fd,buf,readlen);
        //......

        
        if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
            //调用clusterProcessPacket解析这个连接的消息,即 link->rcvbuf
            if (clusterProcessPacket(link)) {
                sdsfree(link->rcvbuf);
                link->rcvbuf = sdsempty();
            } else {
                return/* Link no longer valid. */
            }
        }
    }
}

clusterProcessPacket
即是该方法的核心所在,它会将对端节点发送的消息进行解析与处理,这里我们就以收到pong
消息为例说明一下流程,假设回复pong
的是master
节点,它会更新收到这条网络连接pong
响应时间,然后解析报文内容,如果发现有个节点不在我们的节点列表中,将其存入node
字典表中:

int clusterProcessPacket(clusterLink *link) {
    //......

    /* Perform sanity checks */
    //消息完整性校验
   //......

    /* Check if the sender is a known node. */
    //检查发送节点是否是已知节点
    sender = clusterLookupNode(hdr->sender);
    //......

   //......

    /* PING, PONG, MEET:消息处理逻辑 */
    if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
        type == CLUSTERMSG_TYPE_MEET)
    {
      //......
  
  //如果收到pong则更新pong_received为当前时间
        if (link->node && type == CLUSTERMSG_TYPE_PONG) {
            link->node->pong_received = mstime();
            link->node->ping_sent = 0;

           //......
        }

      
  //......

       
        //如果当前节点是已知节点,则调用clusterProcessGossipSection查看当前pong消息中的内容是否包含未知的、新加入的节点
        if (sender) clusterProcessGossipSection(hdr,link);
    } else if (type == CLUSTERMSG_TYPE_FAIL) {
        //......
    }
 //......    
    return 1;
}

步入clusterProcessGossipSection
即可看到该函数会遍历消息中的节点,一旦发现该节点是新添加节点则调用clusterStartHandshake
其存入nodes
字典表中:

void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
    uint16_t count = ntohs(hdr->count);
    //解析当前节点gossip消息内容
    clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
    clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
 //遍历node
    while(count--) {
     //......
     //打印当前节点信息
        redisLog(REDIS_DEBUG,"GOSSIP %.40s %s:%d %s",
            g->nodename,
            g->ip,
            ntohs(g->port),
            ci);
      
        node = clusterLookupNode(g->nodename);
        if (node) {//已知节点处理,如果不可通信才握手重连
           //......
        } else {//未知节点则发起握手,若握手建立通信成功则将其存入nodes字典中
            //......
            if (sender &&
                !(flags & REDIS_NODE_NOADDR) &&
                !clusterBlacklistExists(g->nodename))
            {
                clusterStartHandshake(g->ip,ntohs(g->port));
            }
        }

      //走到下一个节点
        g++;
    }
}

我们给出clusterStartHandshake
中将其存入server
cluster
nodes
字典表的逻辑:

int clusterStartHandshake(char *ip, int port) {
    //......
 //如果处于握手中,则说明之前已经发现并进行通信了,直接返回
    if (clusterHandshakeInProgress(norm_ip,port)) {
        errno = EAGAIN;
        return 0;
    }

 //基于消息创建node结构其,并调用clusterAddNode将其存入server.cluster->nodes字典表中
    n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET);
    memcpy(n->ip,norm_ip,sizeof(n->ip));
    n->port = port;
    clusterAddNode(n);
    return 1;
}

小结

来简单小结一下Redis集群节点如何通过Gossip协议构建集群网络的:

  1. 新节点通过meet和集群中某个节点a建立连接。
  2. 当前节点执行clusterCron
    定时任务时,随机抽取5个节点并找到最早回复pong
    的实例,假设是节点a,发送ping消息。
  3. 注册clusterReadHandler
    处理器其他节点发送的消息。
  4. 收到节点a的pong
    消息回复,判断查看该节点是否是已知节点,如果是则调用clusterProcessGossipSection
    解析报文内容,如果存在新节点则进行握手通信,如果连接建立成功则将该节点存入当前实例的nodes
    节点中。

我是 sharkchili ,CSDN Java 领域博客专家mini-redis的作者,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号: 写代码的SharkChili 。


同时也非常欢迎你star我的开源项目mini-redis:https://github.com/shark-ctrl/mini-redis

因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注  “加群”  即可和笔者和笔者的朋友们进行深入交流。

参考

《Redis 源码剖析与实战》


文章转载自写代码的SharkChili,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论