同步API代码实现
首先我们看一下redisContext
这个数据结构的各个字段的具体含义:
redisContext.err
,用于记录这个连接上的错误码,当连接没有错误时,这个字段会被设置为0。redisContext.errstr
,当连接上发生错误的话,这个字符串字段存储了错误的具体信息。redisContext.fd
,表示这个连接对应的文件描述符。redisContext.flags
,记录这个Redis连接上的状态信息:REDIS_BLOCK
,这个掩码表示当前连接是否为阻塞连接,对于同步API的连接都是阻塞模式,而异步API连接则是使用非阻塞模式。REDIS_CONNECTED
,这一位掩码表示当前的连接是否已经建立,同步API中调用connect
系统调用建立连接之后,或者个异步API中在连接的回调之中会给这个连接设置这个掩码。REDIS_DISCONNECTING
,这个掩码用于异步API之中,当尝试关闭连接的时候,会为连接设置这个掩码。在这种状态下,连接将不会继续接受新的查询命令请求,同时会刷新并清理输出缓冲区之中的数据,并将网络之中读取并解析所有的返回数据。REDIS_FREEING
,这个掩码用于异步API之中,当这个连接要被释放时,会给这个连接设置这个掩码,表示这个连接应该尽快被清理。REDIS_IN_CALLBACK
,这个掩码用于异步API之中,表示这个连接正在执行查询命令的回调函数。REDIS_SUBSCRIBED
,用于异步API之中,表示这个连接有一个或者更多的订阅操作。REDIS_MONITORING
,当使用监视器模式的时候,需要设置这个掩码。REDIS_REUSEADDR
,当对应的连接采用地址重用模式的时候,需要为这个连接设置这个掩码。redisContext.obufs
,这是一个动态的sds
类型的数据,作为连接的输出缓冲区。redisContext.reader
,连接的输入缓冲区,redisReader
这个对象类型集成了对于数据的解析功能。redisContext.connection_type
,记录这个连接所使用的连接类型:REDIS_CONN_TCP
,表明这是一个Socket网络套接字。REDIS_CONN_UNIX
,表明这是一个Unix本地套接字。
建立连接
在deps/hiredis/net.h以及deps/hiredis/net.c这两个文件之中定义了Hiredis库中关于网络连接的一些接口。首先在deps/hiredis/net.c源文件之中,定义了若干关于网络操作的静态函数接口,由于面前对于Redis服务器的底层网络API已经做过介绍,因此这里仅对相关函数进行一个简单的罗列:
| 静态函数接口 | 功能 |
|---|---|
int redisCreateSocket(redisContext *c, int type) | 为redisContext对象创建一个Socket文件描述符 |
void redisContextCloseFd(redisContext *c) | 关闭redisContext对象上的Socket文件描述符 |
int redisSetReuseAddr(redisContext *c) | 设置redisContext对象连接地址可重用 |
int redisSetBlocking(redisContext *c, int blocking) | 设置redisContext对象连接的阻塞状态 |
int redisSetTcpNoDelay(redisContext *c) | 关闭redisContext对象连接上的Nagle设置 |
int redisContextTimeoutMsec(redisContext *c, long *result) | 获取redisContext对象上设置的超时间事件毫秒数 |
除了上面这些基础的静态函数接口之外,deps/hiredis/net.c源文件之中还定义了两个比较重要的静态函数接口, 首先是等待连接就绪的函数接口:
int redisContextWaitReady(redisContext *c, long msec){struct pollfd wfd[1];wfd[0].fd = c->fd;wfd[0].events = POLLOUT;if (errno == EINPROGRESS){int res;if ((res = poll(wfd, 1, msec) == -1)){...}...return REDIS_OK;}return REDIS_ERR;}
这个函数的主要用途便是调用poll
系统调用等待redisContext
连接上的可写事件。我们在前面介绍Redis底层API的时候曾经讲解过的非阻塞connect
操作。而在Hiredis库之中,客户端调用API向服务器发起连接都是采用非阻塞地connect
进行的,因此我们需要一个接口来检查连接上是否触发了可写事件。redisContextWaitReady
这个接口便是为了这个目的而实现的,如果redisContextWaitReady
函数从poll
系统调用的阻塞之中返回,且没有返回错误,那么说明与服务器之间的网络连接已经完全建立;否则便说明连接建立失败。
另外一个接口便是客户端主动向服务器发起连接的函数:
int _redisContextConnectTcp(redisContext *c, const char *addr, int port, const struct timeval *timeout, const char *source_addr){int blocking = (c->flags & REDIS_BLOCK);...s = socket(p->ai_family,p->ai_socktype,p->ai_protocol);c->fd = s;redisSetBlocking(c,0);if (connect(s,p->ai_addr,p->ai_addrlen) == -1) {...else if (errno == EINPROGRESS && !blocking) {}...else{if (redisContextWaitReady(c,timeout_msec) != REDIS_OK)goto error;}}if (blocking && redisSetBlocking(c,1) != REDIS_OK)goto error;}
通过上面这个代码片段,我们可以发现不论redisContext.flags
字段上是否携带了REDIS_BLOCK
标记,客户端与服务器之间建立连接的过程都是采用非阻塞connect
的调用来进行的,而在前面对于redisContext
数据结构的讲解中,我们也介绍了REDIS_BLOCK
掩码使用表示同步API还是异步API的。而在建立连接的过程之中,唯一区别在于同步的API在非阻塞connect
之后,会通过redisContextWaitReady
接口阻塞地等待连接建立;而在异步API之中,则不会通过redisContextWaitReady
函数接口等待连接建立,取而代之的会将这个redisContext
对象加入事件循环之中,通过回调函数来建立连接。
在deps/hiredis/hiredis.h以及deps/hiredis/hiredis.c这两个文件之中,定义了多干个用于发起连接的API:
redisContext *redisConnect(const char *ip, int port);redisContext *redisConnectWithTimeout(const char *ip, int port, const struct timeval tv);redisContext *redisConnectNonBlock(const char *ip, int port);redisContext *redisConnectBindNonBlock(const char *ip, int port, const char *source_addr);redisContext *redisConnectBindNonBlockWithReuse(const char *ip, int port, const char *source_addr);
在这几个API之中redisConnect
以及redisConnectWithTimeout
用于在同步API之中发起连接,而另外三个API则是用于在异步API之中向服务器发起连接。
命令格式化
Hiredis库对于查询命令的格式化以及发送的相关逻辑代码主要定义在deps/hiredis/hiredis.h以及deps/hiredis/hiredis.c这两个代码文件之中。
在redisContext
这个结构体之中,redisContext.obuf
字段代表着这个连接所对应的应用层输出缓冲区,所有的查询命令都会首先被写入这个输出缓冲区,然后再通过write
系统调用写入网络。Hiredis给出了一组用于对输出命令进行格式化的函数接口,这组接口会按照Redis的协议格式对客户端给出的查询命令数据进行格式化:
int redisvFormatCommand(char **target, const char *format, va_list ap);int redisFormatCommand(char **target, const char *format, ...);int redisFormatCommandArgv(char **target, int argc, const char **argv, const size_t *argvlen);int redisFormatSdsCommandArgv(sds *target, int argc, const char ** argv, const size_t *argvlen);
而客户端对于流水线机制的支持,即redisAppendCommand
系列函数便都是借用上面的这些函数接口,将格式化后的查询命令数据写入到应用层缓冲区redisContext.obuf
之中来实现的。以redisvAppendCommand
这个函数接口为例:
int redisvAppendCommand(redisContext *c, const char* format, va_list ap){char * cmd;int len;len = redisvFormatCommand(&cmd, format, ap);...__redisAppendCommand(c, cmd, len);free(cmd);return REDIS_OK;}
这里我们可以看到,redisvAppendCommand
这个函数接口首先会通过redisvFormatCommand
将查询命令的数据格式化到一段内存cmd
之中,在将这段存储着格式化命令数据的内存cmd
追加到redisContext.obuf
输出缓冲区之中。
而非流水线的同步命令查询的redisCommand
系列接口,则是在redisAppendCommand
系列函数接口的基础上来实现的,其本质就是将格式化后的查询命令追加到redisContext.obuf
之后,立即将缓冲区之中的查询命令数据发送到网络,并同步地等待查询命令返回数据,以redisvCommand
接口为例:
void *redisvCommand(redisContext *c, const char *format, va_list ap){if (redisvAppendCommand(c, format, ap) != REDIS_OK)return NULL;return __redisBlockForReply(c);}
数据的发送与接收
在底层将redisContext
输出缓冲区之中的数据发送出去则是通过下面这个接口来实现的:
int redisBufferWrite(redisContext *c, int *done);
这个函数会尝试调用write
系统调用,将redisContext.obuf
之中的数据写入网络的内核输出缓冲区。如果输出失败,函数会返回REDIS_ERR
,否则会返回REDIS_OK
。同时,redisContext.obuf
之中的数据是否已经全部被发送,则会通过done
参数进行返回。
而将数据从内核缓冲区读取到应用层缓冲区的工作,则是由redisBufferRead
这个函数来完成的:
int redisBufferRead(redisContext *c);
除了将数据从内核缓冲区读取到应用层缓冲区之外,redisBufferRead
这个接口还会检测服务器端是否主动断开连接。如果read
系统调用返回,但是读取的字节数为0,那么则意味着服务器端主动将连接断开。此时,会将redisContext.err
这个字段设置为REDIS_ERR_EOF
用以标记连接已经被断开。
上述的两个函数都是使用阻塞的方式调用write
接口以及read
接口来实现数据的发送与接收。而在同步API之中,数据的发送与接收都是通过介绍的redisGetReply
来完成的:
int redisGetReply(redisContext *c, void **reply){if (redisGetReplyFromReader(c, &aux) == REDIS_ERR)return REDIS_ERR;if (aux == NULL && c->flags & REDIS_BLOCK){do {if (redisBufferWrite(c, *wdone) == REDIS_ERR)return REDIS_ERR;} while (!wdone);do {if (redisBufferRead(c) == REDIS_ERR)return REDIS_ERR;if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)return REDIS_ERR;} while (aux == NULL);}}
以上便是关于Hiredis之中同步API的一个简要的介绍,谢谢大家阅读。




