暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Redis源码学习(68)-客户端异步API(下)

马基雅维利incoding 2021-07-12
1214

异步API连接的建立

    redisAsyncContext *redisAsyncConnect(const char *ip, int port);
    redisAsyncContext *redisAsyncConnectBind(const char *ip, int port, const char *source_addr);
    redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port, const char *source_addr);

    通过上面的三个接口,我们可以异步地向Redis服务器发起连接,并返回redisAysyncContext
    对象,不过需要注意的是,上述三个函数返回的返回redisAsyncContext
    对象不可以直接使用,因此此时连接没有完全建立。我们需要redisAysncContext
    对象注册onConnect
    回调函数:

      #define _EL_ADD_WRITE(ctx) do { \
      if ((ctx)->ev.addWrite) (ctx)->ev.addWrite((ctx)->ev.data); \
      } while(0)
      int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn)
      {
      if (ac->onConnect == NULL)
      {
      ac->onConnect = fn;
      _EL_ADD_WRITE(ac);
      return REDIS_OK;
      }
      }

      这里在向redisAsyncContext
      对象注册onConnect
      回调函数时,同时也会向事件驱动之中注册监听可写事件。这样当异步发起的连接被正式建立的时候,会触发Socket文件描述符上的可写事件,在redisAsyncHandleWrite
      这个处理写事件的接口之中,便可以调用我们注册的onConnect
      函数,完成连接的建立。

      异步API命令的发送与回调处理

        int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap);
        int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...);
        int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen);
        int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len);

        上面的四个函数接口是异步API之中用于向Redis服务器异步地发送查询命令的接口。在这些接口之中,查询命令数据会被写入到redisAsyncContext
        对象的应用层输出缓冲区之中,同时通过_EL_ADD_WRITE
        这个宏,调用redisAsyncContext.ev.addWrite
        向事件驱动之中注册监听可写事件。

        这样,当系统内核空间的输出缓冲区有足够空间时,事件驱动库便会触发可写事件的处理函数redisAsyncHandleWrite
        ,将应用层缓冲区之中数据写入内核空间:

          void redisAsyncHandleWrite(redisAsyncContext *ac)
          {
          redisContext *c = &(ac->c);
          int done = 0;
          ...
          if (redisBufferWrite(c, &done) == REDIS_ERR)
          __redisAsyncDisconnect(ac);
          else {
          if (!done)
          _EL_ADD_WRITE(ac);
          else
          _EL_DEL_WRITE(ac);


          _EL_ADD_READ(ac);
          }
          }

          这里会通过redisBufferWrite
          向内核缓冲区之中写入数据。如果数据输出完成,则会从事件驱动库之中将注册的可写事件的监听移除(这是为了防止后续频繁触发连接上的可写事件)。最后还会向事件驱动中注册可读事件的监听_EL_ADD_READ
          ,以期待后续接收从服务器收到的返回数据。

          我们回来再看一下向服务器发送查询命令的接口,调用这些接口除了我们需要给出查询命令的数据之外,我们还可以向接口之中传递一个命令回调函数的函数指针,用于处理这个命令的返回数据。如果传递一个NULL
          值,则意味着我们不关系也不需要处理该命令的返回数据;同时我们可以根据自己的应用的需求传递一个私有的数据privdata
          ,用来配合命令返回数据的回调函数,这个数据如果不需要也可以传递一个NULL
          的值。

          正常情况下,我们执行一个普通的查询命令时,Hiredis会用回调函数redisCallbackFn *fn
          以及用户的私有数据void *privdata
          构造一个redisCallback
          对象,并将之插入到redisAsyncContext.replies
          回调队列之中。前面我们介绍了,在Hiredis之中,普通的查询命令以及发布/订阅命令之间的差异,因此如果我们通过异步API执行订阅命令时,对应的回调对象redisCallback
          不会被插入redisAsyncContext.replies
          队列之中,而是会根据订阅频道还是订阅模式的不同,被插入到redisAsyncContext.sub.channels
          以及redisAsyncContext.sub.patterns
          这两个哈希表之中。

          在服务器处理完用户的查询命令之后,将返回数据发送给客户端,这是在用户一侧,会触发redisAsyncContext
          上的可读事件的回调函数redisAsyncHandleRead

            void redisAsyncHandleRead(redisAsyncContext *ac) {
            redisContext *c = &(ac->c);
            ...
            if (redisBufferRead(c) == REDIS_ERR)
            __redisAsyncDisconnect(ac);
            else {
            _EL_ADD_READ(ac);
            redisProcessCallbacks(ac);
            }
            }

            这个Hiredis在通过redisBufferRead
            函数完成数据读取之后,通过redisProcessCallbacks
            这个接口,执行特定的回调函数处理逻辑。

              void redisProcessCallbacks(redisAsyncContext *ac) {
              redisContext *c = &(ac->c);
              redisCallback cb = {NULL, NULL, NULL};
              void *reply = NULL;
              int status;


              while((status = redisGetReply(c,&reply)) == REDIS_OK) {
              ...
              if (cb.fn != NULL)
              {
              __redisRunCallback(ac,&cb,reply);
              c->reader->fn->freeObjecy(reply);


              if (c->flags & REDIS_FREEING)
              {
              __redisAsyncFree(ac);
              return;
              }
              }
              else {
              c->reader->fn->freeObject(reply);
              }
              }
              ...
              }

              这里我们可以看到,查询命令的返回数据redisReply
              会被Hiredis自动释放,这也就意味着,我们无法取出redisReply
              返回数据,如果我们希望操作这些返回数据,只能通过注册自定义的回调函数的方式来进行。

              异步API连接的断开

              在异步API之中,redisAsyncContext
              对象可以通过下面这个接口主动断开连接,并释放redisAsyncContext
              对象:

                void redisAsyncDisconnect(redisAsyncContext *ac)
                {
                redisContext *c = &(ac->c);
                c->flags |= REDIS_DISCONNECTING;
                if (!(c->flags & REDIS_IN_CALLBACK) && ac->replies.head == NULL)
                __redisAsyncDisconnect(ac);
                }

                在这个redisAsyncDisconnect
                函数之中,只有redisAsyncContext.replies
                这个挂起的回调队列为空,并且当前的对象没有在处理回调函数时,才会立即调用__redisAsyncDisconnect
                函数断开连接;否则不会立即将连接断开,而是会将连接设置为REDIS_DISCONNECTING
                的状态,在这种状态下redisAsyncContext
                对象不会再接受新的查询命令,但是对于已经进入输出缓冲区之中的查询命令,Hiredis依然会执行查询逻辑以及处理返回数据的回调逻辑。

                redisAsyncContext
                对象处理完全部的挂起回调之后,同样也会调用底层的__redisAsyncDisconnect
                来执行断开连接的处理流程。

                  void redisProcessCallbacks(redisAsyncContext *ac) {
                  redisContext *c = &(ac->c);
                  void *reply = NULL;
                  int status;
                  while((status = redisGetReply(c,&reply)) == REDIS_OK) {


                  }
                  if (status != REDIS_OK)
                  __redisAsyncDisconnect(ac);
                  }

                  同时通过上面读取处理返回数据的redisProcessCallbacks
                  代码片段我们也可以看到,当读取返回数据发生错误的时候,也会调用底层的__redisAsyncDisconnect
                  函数来释放连接。

                  __redisAsyncDisconnect
                  处理断开连接的过程之中会执行如下的逻辑:

                  1. 因为断开连接有可能发生在数据读取的过程之中,因此会存在redisAsyncContext.replies
                    以及redisAsyncContext.sub
                    之中残留有挂起的回调对象的情况。在连接被真正断开之前,首先会依次触发所有被挂起的对调对象,只不过回调函数的reply
                    参数为NULL
                    ,这也意味着我们自定义的回调函数,需要检查reply
                    参数,并对参数为NULL
                    的情况给出相应的处理。

                  2. 清理redisAsyncContext
                    对象在事件驱动模块之中注册的可读与可写事件的监听。

                  3. 如果redisAsyncContext
                    对象定义了onDisconnect
                    回调,则调用该回调函数,执行额外的清理逻辑。

                  4. 最后关闭连接,释放对象的应用层缓冲区。

                  异步API状态迁移

                  在前面介绍Hiredis同步API的文章中,我们了解到redisContext.flags
                  这个字段会用于标记当前与Redis服务器的连接对象的状态信息。由于redisContext
                  同样也是异步API中redisAsyncContext
                  对象的字段,因此这些状态信息对于异步API依然有效,本小节将对异步API之中可能会设计到的状态进行一个简要的介绍:

                  • REDIS_CONNECTED

                    • 该状态表示redisAsyncContext
                      对象与redis服务器之间已经建立起了完整的网络连接,由于是通过异步connect
                      来建立的连接,因此调用redisAsyncConnect
                      接口获得的redisAsyncContext
                      对象是没有REDIS_CONNECTED
                      这个状态的。只有在redisAsycnContext
                      对象在事件驱动模块之中触发了__redisAsyncHandleConnect
                      回调正常地完成连接的建立之后,redisAsyncContext
                      对象才会被设置上REDIS_CONNECTED
                      这个标记。

                  • REDIS_IN_CALLBACK

                    • 该状态表示redisAsyncContext
                      对象正在执行回调函数逻辑,在__redisRunCallback
                      这个接口之中,当我们真正调用回调函数之前,会为redisAsyncContext
                      对象附加上这个状态,当回调函数结束之后,会将这个状态移除。这个状态可以保护redisAsyncContext
                      对象在执行回调函数的过程之中不会被其他的线程断开连接或者释放,在断开连接的函数redisAsyncDisconnect
                      以及释放对象的函数redisAsyncFree
                      之中都会检查这个REDIS_IN_CALLBACK
                      状态。

                  • REDIS_SUBSCRIBED

                    • 该状态表示redisAsyncContext
                      对象正在订阅某个频道或者某个模式,用户在通过__redisAsyncCommand
                      redisAsyncContext
                      对象上执行SUBSCRIBE命令时,会为对象附加上这个状态;而当用户执行UNSUBSCRIBE命令后,REDIS_SUBSCRIBED
                      状态将会被移除。我们在前面介绍订阅/发布机制的时候有讲过服务器端client
                      对象在进入订阅模式之后,将无法执行其他的非订阅/发布的命令。这里当redisAsyncContext
                      对象处于REDIS_SUBSCRIBED
                      时,如果通过__redisAsyncCommand
                      执行一条普通的查询命令时,这个查询命令的回调函数不会被加入到redisAsyncContext.replies
                      队列之中,而是会加入到redisAsyncContext.sub.invalid
                      这个队列里。

                  • REDIS_DISCONNECTING

                    • 该状态表示redisAsyncContext
                      对象处于连接将要被断开的状态,在我们调用redisAsyncDisconnect
                      函数主动断开与Redis服务器之间的连接时,对象会被附加上这个状态标记。拥有这个标记的redisAsyncContext
                      对象无法执行新的查询命令,当这个对象所有被挂起的回调对象全部被处理完的时候,这个对象的连接才会被真的断开。

                  • REDIS_FREEING

                    • 该状态表示redisAsyncContext
                      对象处于将要被释放的状态,在我们调用redisAsyncFree
                      尝试释放一个redisAsyncContext
                      对象的时候,为会对象设置上REDIS_FREEING
                      的状态,在所有挂起的数据处理结束之后,这个对象才会真正地被释放。

                  文章转载自马基雅维利incoding,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                  评论