暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

redis启动分析(事件处理)

自己的设计师 2016-11-18
356


首先借用一张经典的图来描述redis启动的过程:


接下来分析每个过程中涉及到的具体函数,以及实现。

首先来看一个结构体:

/* State of an event based program */
typedef struct aeEventLoop {
   
int maxfd;   /* highest file descriptor currently registered */
   
int setsize; /* max number of file descriptors tracked */
   
long long timeEventNextId;
   
time_t lastTime;     /* Used to detect system clock skew */
   
aeFileEvent *events; /* Registered events */
   
aeFiredEvent *fired; /* Fired events */
   
aeTimeEvent *timeEventHead;
   
int stop;
   
void *apidata; /* This is used for polling API specific data */
   
aeBeforeSleepProc *beforesleep;

} aeEventLoop;


这个结构体是整个I/O Based Event和 Time Base Event通知的核心 。所有的基于IO/Time Base Event的信息都会注册到这个结构体中,一旦有IO或者Time Event到来,通过这个结构注册的信息,找到相关的处理函数就可以进行处理了 。

那么Time/IO Based Event是如何被注册,又是何时被处理,而且是如何处理的呢 ? 带着这三个疑问,我们来具体分析。

一.时间如何被注册

 在服务启动初期,会有如下调用:

initServer()->server.el =

             aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); 

假设设置的maxclients=1000

其中CONFIG_FDSET_INCR定义如下:

/* When configuring the server eventloop, we setup it so

* that the total number of file descriptors we can handle

* are server.maxclients + RESERVED_FDS + a few more to stay

* safe. Since RESERVED_FDS defaults to 32, we add 96 in order

* to make sure of not over provisioning more than 128 fds. */

#define CONFIG_FDSET_INCR (CONFIG_MIN_RESERVED_FDS+96)  //CONFIG_FDSET_INCR=32+96=128

其中,server.el定义在redisServer 中:

 struct redisServer {

          …..

          aeEventLoop *el;

  };


aeCreateEventLoop会返回一个指向aeEventLoop的指针,那么aeCreateEventLoop具体做了哪些事情呢?

aeEventLoop *aeCreateEventLoop(int setsize) {

    aeEventLoop *eventLoop;

    int i;

    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;

    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);

    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);

    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    eventLoop->
setsize = setsize;
    eventLoop->
lastTime = time(NULL);
    eventLoop->
timeEventHead = NULL;
    eventLoop->
timeEventNextId = 0;
    eventLoop->
stop = 0;
    eventLoop->
maxfd = -1;
    eventLoop->
beforesleep = NULL;
   
if (aeApiCreate(eventLoop) == -1) goto err;
   
/* Events with mask == AE_NONE are not set. So let's initialize the
     * vector with it. */

   
for (i = 0; i < setsize; i++)
        eventLoop->
events[i].mask = AE_NONE;
   
return eventLoop;
err:
   
if (eventLoop) {
       
zfree(eventLoop->events);
       
zfree(eventLoop->fired);
       
zfree(eventLoop);
    }
   
return NULL;

}

这段函数可以简单用创建完之后用图形描述,大概是如下的形式:


aeFileEvent和aeFiredEvent和aeApiState结构体如下:

/* File event structure */
typedef struct aeFileEvent {
   
int mask; /* one of AE_(READABLE|WRITABLE) */
   
aeFileProc *rfileProc;
   
aeFileProc *wfileProc;
   
void *clientData;

} aeFileEvent;


/* A fired event */
typedef struct aeFiredEvent {
   
int fd;
   
int mask;

} aeFiredEvent;


typedef struct aeApiState {
   
int kqfd;
   
struct kevent *events;
} aeApiState;


从上面我们可知,经过aeCreateEventLoop的初始化,一个EventLoop基本成型了,包含一个1128个aeFileEvent元素的数组和一个1128个aeFiredEvent元素的数组。 同时,通过kqueue(mac上的实现,linux上对应为epoll)初始化了一个文件句柄池,后续需要将相应的socket只要挂到kqfd下,就将相应的文件事件注册到了EventLoop中了。


在initserver创建完EventLoop结构体之后,接下来就是将监听事件首先注册到EventLoop 中 ,这里分两步:

1.创建监听端口的socket

2.将socket 注册到EventLoop 中

1.创建监听的socket:

   /* Open the TCP listening socket for the user commands. */

    if (server.port != 0 &&
       
listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
       
exit(1);

   
/* Open the listening Unix domain socket. */
   
if (server.unixsocket != NULL) {
       
unlink(server.unixsocket); /* don't care if this fails */
       
server.sofd = anetUnixServer(server.neterr,server.unixsocket,
           
server.unixsocketperm, server.tcp_backlog);
       
if (server.sofd == ANET_ERR) {

           ...
           
exit(1);
        }
       
anetNonBlock(NULL,server.sofd);
    }

   
/* Abort if there are no listening sockets at all. */
   
if (server.ipfd_count == 0 && server.sofd < 0) {

       ....
       
exit(1);

    }

首先创建IPV4,IPV6监听socket的数组 server.ipfd,然后创建unix socket的监听socket server.sofd。

2.将socket 注册到EventLoop 中

在将监听事件注册到EventLoop之前会先将时间时间注册到EventLoop中 :

/* Create the timer callback, this is our way to process many

* background operations incrementally, like clients timeout,

* eviction of unaccessed expired keys and so forth. */
 
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
     
serverPanic("Can't create event loop timers.");
     
exit(1);
    }
 
/* Create an event handler for accepting new connections in TCP and Unix
  * domain sockets. */

  
for (j = 0; j < server.ipfd_count; j++) {
       
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
           
acceptTcpHandler,NULL) == AE_ERR)
            {
               
serverPanic(
                   
"Unrecoverable error creating server.ipfd file event.");
            }
    }
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
    
acceptUnixHandler,NULL) == AE_ERR)

       serverPanic("Unrecoverable error creating server.sofd file event.");


   
/* Open the AOF file if needed. */
   
if (server.aof_state == AOF_ON) {
       
server.aof_fd = open(server.aof_filename,
                              
O_WRONLY|O_APPEND|O_CREAT,0644);
       
if (server.aof_fd == -1) {
           
serverLog(LL_WARNING, "Can't open the append-only file: %s",
               
strerror(errno));
           
exit(1);
        }

    }


这里我们看到,初始化时,注册到EventLoop中的总共有三个事件:

1.aeCreateTimeEvent将Time Based Event注册到EventLoop中。

2.aeCreateFileEvent将端口监听事件注册到EventLoop中。

3.aeCreateFileEvent将Unix Socket监听事件注册到EventLoop中。


1.aeCreateTimeEvent将Time Based Event注册到EventLoop中:

aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL)

的具体实现如下 :

long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
       
aeTimeProc *proc, void *clientData,
       
aeEventFinalizerProc *finalizerProc)
  {
   
long long id = eventLoop->timeEventNextId++;
   
aeTimeEvent *te;

    te =
zmalloc(sizeof(*te));
   
if (te == NULL) return AE_ERR;
    te->
id = id;
   
aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
    te->
timeProc = proc;
    te->
finalizerProc = finalizerProc;
    te->
clientData = clientData;
    te->
next = eventLoop->timeEventHead;
    eventLoop->
timeEventHead = te;
   
return id;

}


其中aeTimeEvent结构体如下 :

/* Time event structure */
typedef struct aeTimeEvent {
   
long long id; /* time event identifier. */
   
long when_sec; /* seconds */
   
long when_ms; /* milliseconds */
   
aeTimeProc *timeProc;
   
aeEventFinalizerProc *finalizerProc;
   
void *clientData;
   
struct aeTimeEvent *next;

} aeTimeEvent;

用图形比较形象的描述如下:


我们可以看到,Time Event是通过单向链表连接在一起,初始化只有一个时间事件,所以这里链表中只有一个Node。

这个Time Event主要做一些expire key的主动清理,clients超时的断开等工作(代码中注释描述如下):

/* Create the timer callback, this is our way to process many background

 * operations incrementally, like clients timeout, eviction of unaccessed

 * expired keys and so forth. */


2.aeCreateFileEvent将端口监听事件注册到EventLoop中。

3.aeCreateFileEvent将Unix Socket监听事件注册到EventLoop中:

(这里假设ipv4监听对应的socket的fd为1,ipv6监听对应的socket的fd为2,unix socket监听对应的socket的fd为3)


这里我们可以看到,监听socket已经放入到了EventLoop的events数组中,且对应的callback function为acceptTcpHandleracceptUnixHandler并且对应的事件已经注册到了fqfd中了。 


二.事件何时被处理以及如何被处理

在启动函数initserver中,有两个函数的调用监听事件处理有关:

initserver()

       ——>aeSetBeforeSleepProc(server.el,beforeSleep);  [1]

        ——>aeMain(server.el); [2]

[1] 就不用解释了,主要是设置EventLoopbeforeSleep函数指针为beforeSleep 。

这里重点分析[2]  aeMain(server.el); 

首先来看看函数的实现:  

void aeMain(aeEventLoop *eventLoop) {

    eventLoop->stop = 0;
   
while (!eventLoop->stop) {
       
if (eventLoop->beforesleep != NULL)
            eventLoop->
beforesleep(eventLoop);
       
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }

}

1.每次在休眠等待事件处理前,调用eventLoop->beforesleep(eventLoop)。

2.aeProcessEvents(eventLoop, AE_ALL_EVENTS),等待所有的read/write/time event发生,并进行处理 。


这里重点先分析第2步:

首先看代码实现:

/* Process every pending time event, then every pending file event
 * (that may be registered by time event callbacks just processed).
 * Without special flags the function sleeps until some file event
 * fires, or when the next time event occurs (if any).
 *
 * If flags is 0, the function does nothing and returns.
 * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
 * if flags has AE_FILE_EVENTS set, file events are processed.
 * if flags has AE_TIME_EVENTS set, time events are processed.
 * if flags has AE_DONT_WAIT set the function returns ASAP until all
 * the events that's possible to process without to wait are processed.
 *
 * The function returns the number of events processed. */

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
   
int processed = 0, numevents;

   
/* Nothing to do? return ASAP */
   
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

   
/* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */

   
if (eventLoop->maxfd != -1 ||
        ((flags &
AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
       
int j;
       
aeTimeEvent *shortest = NULL;
       
struct timeval tv, *tvp;

       
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest =
aeSearchNearestTimer(eventLoop);
       
if (shortest) {
           
long now_sec, now_ms;

           
aeGetTime(&now_sec, &now_ms);
            tvp = &tv;

        
/* How many milliseconds we need to wait for the next
          * time event to fire? */

           
long long ms =
                (shortest->
when_sec - now_sec)*1000 +
                shortest->
when_ms - now_ms;

           
if (ms > 0) {
                tvp->
tv_sec = ms/1000;
                tvp->
tv_usec = (ms % 1000)*1000;
            }
else {
                tvp->
tv_sec = 0;
                tvp->
tv_usec = 0;
            }
        }
else {
           
/* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */

           
if (flags & AE_DONT_WAIT) {
                tv.
tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            }
else {
               
/* Otherwise we can block */
                tvp =
NULL; /* wait forever */
            }
        }

        numevents =
aeApiPoll(eventLoop, tvp);
       
for (j = 0; j < numevents; j++) {
           
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
           
int mask = eventLoop->fired[j].mask;
           
int fd = eventLoop->fired[j].fd;
           
int rfired = 0;

   
/* note the fe->mask & mask & ... code: maybe an already

            * processed event removed an element that fired and we

            * still didn't processed, so we check if the event is

              still valid. */
           
if (fe->mask & mask & AE_READABLE) {
                rfired =
1;
                fe->
rfileProc(eventLoop,fd,fe->clientData,mask);
            }
           
if (fe->mask & mask & AE_WRITABLE) {
               
if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->
wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }
    }
   
/* Check time events */
   
if (flags & AE_TIME_EVENTS)
        processed +=
processTimeEvents(eventLoop);

   
return processed; /* return the number of processed file/time events */

}

下面来分析下代码的实现:

1.如果flags有IO事件标识或者有Time事件标识,且flags设置了超时等待标识,则进行2。否则进入6。

2.如果设置了Time Base Event 标识,且时间时间设置了超时等待标识,则从EventLoop的TimeEvent链表中,寻找一个最旧的TimeEvent。

3.如果存在一个最旧的TimeEvent,则获取这个TimeEvent到当前时间的差距diff(ms)=now - shortest :

  如果diff(ms) <=0 ,则设置等待超时时间为0,否则设置为diff(ms)。

4.等待注册过的IO Event发生(IO Base Event),等待时间为diff(ms)。

5.如果有IO Event到来,则处理相应的事件,否则在diff(ms)等待超时,进入6。

6.如果flags设置了Time Base Event事件标识,则执行相应的Time Base Event处理函数: processTimeEvents(eventLoop)。


总结:

1.在redis对IO Based Event处理过程中,会先将相应的IO事件注册到EventLoop中,一旦后续的Request过来,EventLoop中注册的事件被激活,相应事件处理函数会对该Request进行处理。

2.Time Event Based通过EventLoop的单链表连接在一起,每一次IO Based Event处理完之后,都会遍历TimeEvent单链表,如果有满足设置的时间间隔的Time Event需要处理,则进行处理 。

3.本文中涉及到的Time Base Event(serverCron)和具体请求处理的过程没有具体分析,下一篇文章会具体分析整个过程 。












文章转载自自己的设计师,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论