首先借用一张经典的图来描述redis启动的过程: 
接下来分析每个过程中涉及到的具体函数,以及实现。
首先来看一个结构体:
/* State of an event based program */
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
time_t lastTime; /* Used to detect system clock skew */
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
} aeEventLoop;
这个结构体是整个I/O Based Event和 Time Base Event通知的核心 。所有的基于IO/Time Base Event的信息都会注册到这个结构体中,一旦有IO或者Time Event到来,通过这个结构注册的信息,找到相关的处理函数就可以进行处理了 。
那么Time/IO Based Event是如何被注册,又是何时被处理,而且是如何处理的呢 ? 带着这三个疑问,我们来具体分析。
一.时间如何被注册
在服务启动初期,会有如下调用:
initServer()->server.el =
aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
假设设置的maxclients=1000
其中CONFIG_FDSET_INCR定义如下:
/* When configuring the server eventloop, we setup it so
* that the total number of file descriptors we can handle
* are server.maxclients + RESERVED_FDS + a few more to stay
* safe. Since RESERVED_FDS defaults to 32, we add 96 in order
* to make sure of not over provisioning more than 128 fds. */
#define CONFIG_FDSET_INCR (CONFIG_MIN_RESERVED_FDS+96) //CONFIG_FDSET_INCR=32+96=128
其中,server.el定义在redisServer 中:
struct redisServer {
…..
aeEventLoop *el;
};
aeCreateEventLoop会返回一个指向aeEventLoop的指针,那么aeCreateEventLoop具体做了哪些事情呢?
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}
这段函数可以简单用创建完之后用图形描述,大概是如下的形式:

aeFileEvent和aeFiredEvent和aeApiState结构体如下:
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
/* A fired event */
typedef struct aeFiredEvent {
int fd;
int mask;
} aeFiredEvent;
typedef struct aeApiState {
int kqfd;
struct kevent *events;
} aeApiState;
从上面我们可知,经过aeCreateEventLoop的初始化,一个EventLoop基本成型了,包含一个1128个aeFileEvent元素的数组和一个1128个aeFiredEvent元素的数组。 同时,通过kqueue(mac上的实现,linux上对应为epoll)初始化了一个文件句柄池,后续需要将相应的socket只要挂到kqfd下,就将相应的文件事件注册到了EventLoop中了。
在initserver创建完EventLoop结构体之后,接下来就是将监听事件首先注册到EventLoop 中 ,这里分两步:
1.创建监听端口的socket
2.将socket 注册到EventLoop 中
1.创建监听的socket:
/* Open the TCP listening socket for the user commands. */
if (server.port != 0 &&
listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
exit(1);
/* Open the listening Unix domain socket. */
if (server.unixsocket != NULL) {
unlink(server.unixsocket); /* don't care if this fails */
server.sofd = anetUnixServer(server.neterr,server.unixsocket,
server.unixsocketperm, server.tcp_backlog);
if (server.sofd == ANET_ERR) {
...
exit(1);
}
anetNonBlock(NULL,server.sofd);
}
/* Abort if there are no listening sockets at all. */
if (server.ipfd_count == 0 && server.sofd < 0) {
....
exit(1);
}
首先创建IPV4,IPV6监听socket的数组 server.ipfd,然后创建unix socket的监听socket server.sofd。
2.将socket 注册到EventLoop 中
在将监听事件注册到EventLoop之前会先将时间时间注册到EventLoop中 :
/* Create the timer callback, this is our way to process many
* background operations incrementally, like clients timeout,
* eviction of unaccessed expired keys and so forth. */
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR)
serverPanic("Unrecoverable error creating server.sofd file event.");
/* Open the AOF file if needed. */
if (server.aof_state == AOF_ON) {
server.aof_fd = open(server.aof_filename,
O_WRONLY|O_APPEND|O_CREAT,0644);
if (server.aof_fd == -1) {
serverLog(LL_WARNING, "Can't open the append-only file: %s",
strerror(errno));
exit(1);
}
}
这里我们看到,初始化时,注册到EventLoop中的总共有三个事件:
1.aeCreateTimeEvent将Time Based Event注册到EventLoop中。
2.aeCreateFileEvent将端口监听事件注册到EventLoop中。
3.aeCreateFileEvent将Unix Socket监听事件注册到EventLoop中。
1.aeCreateTimeEvent将Time Based Event注册到EventLoop中:
aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL)
的具体实现如下 :
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc)
{
long long id = eventLoop->timeEventNextId++;
aeTimeEvent *te;
te = zmalloc(sizeof(*te));
if (te == NULL) return AE_ERR;
te->id = id;
aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
te->timeProc = proc;
te->finalizerProc = finalizerProc;
te->clientData = clientData;
te->next = eventLoop->timeEventHead;
eventLoop->timeEventHead = te;
return id;
}
其中aeTimeEvent结构体如下 :
/* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *next;
} aeTimeEvent;
用图形比较形象的描述如下:

我们可以看到,Time Event是通过单向链表连接在一起,初始化只有一个时间事件,所以这里链表中只有一个Node。
这个Time Event主要做一些expire key的主动清理,clients超时的断开等工作(代码中注释描述如下):
/* Create the timer callback, this is our way to process many background
* operations incrementally, like clients timeout, eviction of unaccessed
* expired keys and so forth. */
2.aeCreateFileEvent将端口监听事件注册到EventLoop中。
3.aeCreateFileEvent将Unix Socket监听事件注册到EventLoop中:
(这里假设ipv4监听对应的socket的fd为1,ipv6监听对应的socket的fd为2,unix socket监听对应的socket的fd为3)

这里我们可以看到,监听socket已经放入到了EventLoop的events数组中,且对应的callback function为acceptTcpHandler和acceptUnixHandler。并且对应的事件已经注册到了fqfd中了。
二.事件何时被处理以及如何被处理
在启动函数initserver中,有两个函数的调用监听事件处理有关:
initserver()
——>aeSetBeforeSleepProc(server.el,beforeSleep); [1]
——>aeMain(server.el); [2]
[1] 就不用解释了,主要是设置EventLoop的beforeSleep函数指针为beforeSleep 。
这里重点分析[2] aeMain(server.el);
首先来看看函数的实现:
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
1.每次在休眠等待事件处理前,调用eventLoop->beforesleep(eventLoop)。
2.aeProcessEvents(eventLoop, AE_ALL_EVENTS),等待所有的read/write/time event发生,并进行处理 。
这里重点先分析第2步:
首先看代码实现:
/* Process every pending time event, then every pending file event
* (that may be registered by time event callbacks just processed).
* Without special flags the function sleeps until some file event
* fires, or when the next time event occurs (if any).
*
* If flags is 0, the function does nothing and returns.
* if flags has AE_ALL_EVENTS set, all the kind of events are processed.
* if flags has AE_FILE_EVENTS set, file events are processed.
* if flags has AE_TIME_EVENTS set, time events are processed.
* if flags has AE_DONT_WAIT set the function returns ASAP until all
* the events that's possible to process without to wait are processed.
*
* The function returns the number of events processed. */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
long now_sec, now_ms;
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
/* How many milliseconds we need to wait for the next
* time event to fire? */
long long ms =
(shortest->when_sec - now_sec)*1000 +
shortest->when_ms - now_ms;
if (ms > 0) {
tvp->tv_sec = ms/1000;
tvp->tv_usec = (ms % 1000)*1000;
} else {
tvp->tv_sec = 0;
tvp->tv_usec = 0;
}
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already
* processed event removed an element that fired and we
* still didn't processed, so we check if the event is
still valid. */
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
/* Check time events */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
下面来分析下代码的实现:
1.如果flags有IO事件标识或者有Time事件标识,且flags设置了超时等待标识,则进行2。否则进入6。
2.如果设置了Time Base Event 标识,且时间时间设置了超时等待标识,则从EventLoop的TimeEvent链表中,寻找一个最旧的TimeEvent。
3.如果存在一个最旧的TimeEvent,则获取这个TimeEvent到当前时间的差距diff(ms)=now - shortest :
如果diff(ms) <=0 ,则设置等待超时时间为0,否则设置为diff(ms)。
4.等待注册过的IO Event发生(IO Base Event),等待时间为diff(ms)。
5.如果有IO Event到来,则处理相应的事件,否则在diff(ms)等待超时,进入6。
6.如果flags设置了Time Base Event事件标识,则执行相应的Time Base Event处理函数: processTimeEvents(eventLoop)。
总结:
1.在redis对IO Based Event处理过程中,会先将相应的IO事件注册到EventLoop中,一旦后续的Request过来,EventLoop中注册的事件被激活,相应事件处理函数会对该Request进行处理。
2.Time Event Based通过EventLoop的单链表连接在一起,每一次IO Based Event处理完之后,都会遍历TimeEvent单链表,如果有满足设置的时间间隔的Time Event需要处理,则进行处理 。
3.本文中涉及到的Time Base Event(serverCron)和具体请求处理的过程没有具体分析,下一篇文章会具体分析整个过程 。




