暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Redis源码学习(65)-Redis客户端C语言库(下)

马基雅维利incoding 2021-06-23
1593

同步API代码实现

首先我们看一下redisContext
这个数据结构的各个字段的具体含义:

  1. redisContext.err
    ,用于记录这个连接上的错误码,当连接没有错误时,这个字段会被设置为0。

  2. redisContext.errstr
    ,当连接上发生错误的话,这个字符串字段存储了错误的具体信息。

  3. redisContext.fd
    ,表示这个连接对应的文件描述符。

  4. redisContext.flags
    ,记录这个Redis连接上的状态信息:

    1. REDIS_BLOCK
      ,这个掩码表示当前连接是否为阻塞连接,对于同步API的连接都是阻塞模式,而异步API连接则是使用非阻塞模式。

    2. REDIS_CONNECTED
      ,这一位掩码表示当前的连接是否已经建立,同步API中调用connect
      系统调用建立连接之后,或者个异步API中在连接的回调之中会给这个连接设置这个掩码。

    3. REDIS_DISCONNECTING
      ,这个掩码用于异步API之中,当尝试关闭连接的时候,会为连接设置这个掩码。在这种状态下,连接将不会继续接受新的查询命令请求,同时会刷新并清理输出缓冲区之中的数据,并将网络之中读取并解析所有的返回数据。

    4. REDIS_FREEING
      ,这个掩码用于异步API之中,当这个连接要被释放时,会给这个连接设置这个掩码,表示这个连接应该尽快被清理。

    5. REDIS_IN_CALLBACK
      ,这个掩码用于异步API之中,表示这个连接正在执行查询命令的回调函数。

    6. REDIS_SUBSCRIBED
      ,用于异步API之中,表示这个连接有一个或者更多的订阅操作。

    7. REDIS_MONITORING
      ,当使用监视器模式的时候,需要设置这个掩码。

    8. REDIS_REUSEADDR
      ,当对应的连接采用地址重用模式的时候,需要为这个连接设置这个掩码。

  5. redisContext.obufs
    ,这是一个动态的sds
    类型的数据,作为连接的输出缓冲区。

  6. redisContext.reader
    ,连接的输入缓冲区,redisReader
    这个对象类型集成了对于数据的解析功能。

  7. redisContext.connection_type
    ,记录这个连接所使用的连接类型:

    1. REDIS_CONN_TCP
      ,表明这是一个Socket网络套接字。

    2. REDIS_CONN_UNIX
      ,表明这是一个Unix本地套接字。

建立连接

deps/hiredis/net.h以及deps/hiredis/net.c这两个文件之中定义了Hiredis库中关于网络连接的一些接口。首先在deps/hiredis/net.c源文件之中,定义了若干关于网络操作的静态函数接口,由于面前对于Redis服务器的底层网络API已经做过介绍,因此这里仅对相关函数进行一个简单的罗列:

静态函数接口功能
int redisCreateSocket(redisContext *c, int type)
redisContext
对象创建一个Socket文件描述符
void redisContextCloseFd(redisContext *c)
关闭redisContext
对象上的Socket文件描述符
int redisSetReuseAddr(redisContext *c)
设置redisContext
对象连接地址可重用
int redisSetBlocking(redisContext *c, int blocking)
设置redisContext
对象连接的阻塞状态
int redisSetTcpNoDelay(redisContext *c)
关闭redisContext
对象连接上的Nagle设置
int redisContextTimeoutMsec(redisContext *c, long *result)
获取redisContext
对象上设置的超时间事件毫秒数

除了上面这些基础的静态函数接口之外,deps/hiredis/net.c源文件之中还定义了两个比较重要的静态函数接口, 首先是等待连接就绪的函数接口:

    int redisContextWaitReady(redisContext *c, long msec)
    {
    struct pollfd wfd[1];
    wfd[0].fd = c->fd;
    wfd[0].events = POLLOUT;


    if (errno == EINPROGRESS)
    {
    int res;
    if ((res = poll(wfd, 1, msec) == -1))
    {
    ...
    }
    ...
    return REDIS_OK;
    }
    return REDIS_ERR;
    }

    这个函数的主要用途便是调用poll
    系统调用等待redisContext
    连接上的可写事件。我们在前面介绍Redis底层API的时候曾经讲解过的非阻塞connect
    操作。而在Hiredis库之中,客户端调用API向服务器发起连接都是采用非阻塞地connect
    进行的,因此我们需要一个接口来检查连接上是否触发了可写事件。redisContextWaitReady
    这个接口便是为了这个目的而实现的,如果redisContextWaitReady
    函数从poll
    系统调用的阻塞之中返回,且没有返回错误,那么说明与服务器之间的网络连接已经完全建立;否则便说明连接建立失败。

    另外一个接口便是客户端主动向服务器发起连接的函数:

      int _redisContextConnectTcp(redisContext *c, const char *addr, int port, const struct timeval *timeout, const char *source_addr)
      {
      int blocking = (c->flags & REDIS_BLOCK);
      ...
      s = socket(p->ai_family,p->ai_socktype,p->ai_protocol);
      c->fd = s;
      redisSetBlocking(c,0);
      if (connect(s,p->ai_addr,p->ai_addrlen) == -1) {
      ...
      else if (errno == EINPROGRESS && !blocking) {


      }
      ...
      else
      {
      if (redisContextWaitReady(c,timeout_msec) != REDIS_OK)
      goto error;
      }
      }
      if (blocking && redisSetBlocking(c,1) != REDIS_OK)
      goto error;
      }

      通过上面这个代码片段,我们可以发现不论redisContext.flags
      字段上是否携带了REDIS_BLOCK
      标记,客户端与服务器之间建立连接的过程都是采用非阻塞connect
      的调用来进行的,而在前面对于redisContext
      数据结构的讲解中,我们也介绍了REDIS_BLOCK
      掩码使用表示同步API还是异步API的。而在建立连接的过程之中,唯一区别在于同步的API在非阻塞connect
      之后,会通过redisContextWaitReady
      接口阻塞地等待连接建立;而在异步API之中,则不会通过redisContextWaitReady
      函数接口等待连接建立,取而代之的会将这个redisContext
      对象加入事件循环之中,通过回调函数来建立连接。

      deps/hiredis/hiredis.h以及deps/hiredis/hiredis.c这两个文件之中,定义了多干个用于发起连接的API:

        redisContext *redisConnect(const char *ip, int port);
        redisContext *redisConnectWithTimeout(const char *ip, int port, const struct timeval tv);
        redisContext *redisConnectNonBlock(const char *ip, int port);
        redisContext *redisConnectBindNonBlock(const char *ip, int port, const char *source_addr);
        redisContext *redisConnectBindNonBlockWithReuse(const char *ip, int port, const char *source_addr);

        在这几个API之中redisConnect
        以及redisConnectWithTimeout
        用于在同步API之中发起连接,而另外三个API则是用于在异步API之中向服务器发起连接。

        命令格式化

        Hiredis库对于查询命令的格式化以及发送的相关逻辑代码主要定义在deps/hiredis/hiredis.h以及deps/hiredis/hiredis.c这两个代码文件之中。

        redisContext
        这个结构体之中,redisContext.obuf
        字段代表着这个连接所对应的应用层输出缓冲区,所有的查询命令都会首先被写入这个输出缓冲区,然后再通过write
        系统调用写入网络。Hiredis给出了一组用于对输出命令进行格式化的函数接口,这组接口会按照Redis的协议格式对客户端给出的查询命令数据进行格式化:

          int redisvFormatCommand(char **target, const char *format, va_list ap);
          int redisFormatCommand(char **target, const char *format, ...);
          int redisFormatCommandArgv(char **target, int argc, const char **argv, const size_t *argvlen);
          int redisFormatSdsCommandArgv(sds *target, int argc, const char ** argv, const size_t *argvlen);

          而客户端对于流水线机制的支持,即redisAppendCommand
          系列函数便都是借用上面的这些函数接口,将格式化后的查询命令数据写入到应用层缓冲区redisContext.obuf
          之中来实现的。以redisvAppendCommand
          这个函数接口为例:

            int redisvAppendCommand(redisContext *c, const char* format, va_list ap)
            {
            char * cmd;
            int len;
            len = redisvFormatCommand(&cmd, format, ap);
            ...
            __redisAppendCommand(c, cmd, len);


            free(cmd);
            return REDIS_OK;
            }

            这里我们可以看到,redisvAppendCommand
            这个函数接口首先会通过redisvFormatCommand
            将查询命令的数据格式化到一段内存cmd
            之中,在将这段存储着格式化命令数据的内存cmd
            追加到redisContext.obuf
            输出缓冲区之中。

            而非流水线的同步命令查询的redisCommand
            系列接口,则是在redisAppendCommand
            系列函数接口的基础上来实现的,其本质就是将格式化后的查询命令追加到redisContext.obuf
            之后,立即将缓冲区之中的查询命令数据发送到网络,并同步地等待查询命令返回数据,以redisvCommand
            接口为例:

              void *redisvCommand(redisContext *c, const char *format, va_list ap)
              {
              if (redisvAppendCommand(c, format, ap) != REDIS_OK)
              return NULL;
              return __redisBlockForReply(c);
              }

              数据的发送与接收

              在底层将redisContext
              输出缓冲区之中的数据发送出去则是通过下面这个接口来实现的:

                int redisBufferWrite(redisContext *c, int *done);

                这个函数会尝试调用write
                系统调用,将redisContext.obuf
                之中的数据写入网络的内核输出缓冲区。如果输出失败,函数会返回REDIS_ERR
                ,否则会返回REDIS_OK
                。同时,redisContext.obuf
                之中的数据是否已经全部被发送,则会通过done
                参数进行返回。

                而将数据从内核缓冲区读取到应用层缓冲区的工作,则是由redisBufferRead
                这个函数来完成的:

                  int redisBufferRead(redisContext *c);

                  除了将数据从内核缓冲区读取到应用层缓冲区之外,redisBufferRead
                  这个接口还会检测服务器端是否主动断开连接。如果read
                  系统调用返回,但是读取的字节数为0,那么则意味着服务器端主动将连接断开。此时,会将redisContext.err
                  这个字段设置为REDIS_ERR_EOF
                  用以标记连接已经被断开。

                  上述的两个函数都是使用阻塞的方式调用write
                  接口以及read
                  接口来实现数据的发送与接收。而在同步API之中,数据的发送与接收都是通过介绍的redisGetReply
                  来完成的:

                    int redisGetReply(redisContext *c, void **reply)
                    {
                    if (redisGetReplyFromReader(c, &aux) == REDIS_ERR)
                    return REDIS_ERR;
                    if (aux == NULL && c->flags & REDIS_BLOCK)
                    {
                    do {
                    if (redisBufferWrite(c, *wdone) == REDIS_ERR)
                    return REDIS_ERR;
                    } while (!wdone);

                    do {
                    if (redisBufferRead(c) == REDIS_ERR)
                    return REDIS_ERR;
                    if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
                    return REDIS_ERR;
                    } while (aux == NULL);
                    }
                    }

                    以上便是关于Hiredis之中同步API的一个简要的介绍,谢谢大家阅读。

                    文章转载自马基雅维利incoding,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                    评论