暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Redis - Reactor 模式的事件驱动源码解析

架构狂人 2021-06-29
362

前言

  上一篇介绍了 Linux/UNIX 中的三种 I/O 复用实现 select
poll
epoll
,本篇将延续性地介绍 redis 服务器中 I/O 复用的具体应用。

  redis 服务器将所有需处理的事件抽象为两种事件:

  1. 文件事件。redis 服务器是通过 Socket 与客户端进行连接和通信的。文件事件就是服务器对 Socket 的 I/O 操作的一种抽象,包括 accept
    recv
    send
    等 I/O 操作。

  2. 时间事件。redis 服务器中需要定时执行或周期性执行的任务,如 serveCron
    函数。

  其中,针对文件事件,redis 服务器基于 I/O 多路复用模型,实现了使用了一种高效的事件驱动模式:Reactor 模式Reactor 模式解耦了文件事件的监听与事件的处理,并划分出了如下四个独立模块:

  1. 事件监听器
  2. 事件队列
  3. 事件分发器
  4. 事件处理器

  事件监听器在 redis 服务器中由 I/O 多路复用程序实现,负责对多个文件描述符的就绪事件进行监听。 I/O 多路复用程序会将本次等待时间内发生的就绪事件加入到一个事件队列当中。然后事件分发器将会从事件队列依次取出文件事件并按其事件类型,分发给相应的事件处理器进行处理。

  以上各个模块相互解耦,事件的监听与处理之间通过事件队列进行解构。而且一种的事件处理器只为一种事件类型服务,符合单一职责原则的设计理念,也是Reactor 模式的设计原则。

  下面,我们将介绍 redis 服务器的事件循环机制,并对 Reactor 模式在 redis 服务器程序中具体实现作出讨论。

文件事件

  在 redis 源码中,文件事件是由 aeFileEvent
结构体所定义,其定义在 ae.h 文件中,如下所示:

typedef struct aeFileEvent {
// 事件类型掩码,值可以是 AE_READABLE 或 AE_WRITABLE ,或者两者相或
int mask; /* one of AE_(READABLE|WRITABLE) */
// 读事件的处理函数
aeFileProc *rfileProc;
// 写事件的处理函数
aeFileProc *wfileProc;
// 多路复用库的私有数据
void *clientData;
} aeFileEvent;

  aeFileEvent
结构体中包含了事件类型掩码 mask
,以及读写事件各自的处理函数指针。下面我们将会看到,redis 事件循环中的文件事件是以 aeFileEvent
结构体数组的形式进行存储的,并以文件描述符作为索引进行快速检索的。

时间事件

  redis 的时间事件是由 aeTimeEvent
结构体所定义,其同样定义在 ae.h 文件中,如下所示:

typedef struct aeTimeEvent {

// 时间事件的唯一标识符
long long id; /* time event identifier. */

// 事件的到达时间
long when_sec; /* seconds */
long when_ms; /* milliseconds */

// 事件处理函数
aeTimeProc *timeProc;

// 事件释放函数
aeEventFinalizerProc *finalizerProc;

// 多路复用库的私有数据
void *clientData;

// 指向下个时间事件结构,形成链表
struct aeTimeEvent *next;

} aeTimeEvent;

  aeTimeEvent
结构体中记录了该时间事件的标识id、到达时间、处理函数指针等信息。redis 事件循环中所有的时间事件是以 aeFileEvent
结构体链表的形式进行存储的,每个 aeFileEvent
结构体中都包含一个指向下一个时间事件的指针 next
。由于时间事件是按时间顺序逐一执行的,因此链表的形式比较适合对时间事件进行检索。

事件循环

  redis 服务器事件循环是依靠一个全局的 aeEventLoop
结构体来存储服务器程序运行过程中的需要监听与处理的文件事件和时间事件。aeEventLoop
结构体定义在 ae.h 文件中,如下所示:

typedef struct aeEventLoop {
// 目前已监听的最大的描述符值
int maxfd; /* highest file descriptor currently registered */
// 可以监听的描述符的上限值,即描述符值不能超过setsize值
int setsize; /* max number of file descriptors tracked */
// 用于生成时间事件 id
long long timeEventNextId;
// 最后一次执行时间事件的时间
time_t lastTime; /* Used to detect system clock skew */
// 已注册的文件事件,容量为setsize的 aeFileEvent 结构体数组
aeFileEvent *events; /* Registered events */
// 已就绪的文件事件,容量为setsize的 aeFiredEvent 结构体数组
aeFiredEvent *fired; /* Fired events */
// 时间事件 aeTimeEvent结构体链表
aeTimeEvent *timeEventHead;
// 事件处理器的开关
int stop;
// 多路复用库的私有数据
void *apidata; /* This is used for polling API specific data */
// 在处理事件前要执行的函数
aeBeforeSleepProc *beforesleep;
} aeEventLoop;

  可以看出 aeEventLoop
结构体通过 events
数组装载了需要监听的文件事件,通过 fired
数组装载了已就绪的文件事件,通过 timeEventHead
链表装载了需要处理的时间事件。另外,aeEventLoop
结构体是由 aeCreateEventLoop
函数创建, aeCreateEventLoop
函数定义在 ae.c 文件中,如下所示:

aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;

// 创建事件状态结构
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;

// 初始化文件事件结构和已就绪文件事件结构
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);

// 初始化时间事件结构
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;

eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */

for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;

err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}

  从 aeCreateEventLoop
可以看到,它为 aeEventLoop
结构体初始化了 events
fired
这两个文件事件数组,其容量为 setsize
。并且也初始化了时间事件链表,表头为 NULL
。另外由于还未有文件事件被注册监听, events
数组中还未有已注册的文件事件, 所以 events
数组中每一项 aeFileEvent
文件事件结构体的事件掩码 mask
都要被初始化为 AE_NONE
maxfd
最大已注册描述符值则初始化为 -1。

  另外,我们注意到,在 aeEventLoop
结构体中有一个 apidata
成员,由于成员是 void*
类型,它可以装入任意类型的数据。redis 服务器使用 I/O 多路复用对文件事件进行监听,并且为各种平台的 I/O 多路复用函数(select
epoll
kqueue
evport
)进行了良好的封装,具有一致的 API,所以 I/O 多路复用的底层实现是可以随意更换。

  在这里,apidata
成员的意义就是为不同的 I/O 多路复用函数提供一个存放其特定私有数据的空间。

  以 epoll
实现的 I/O 多路复用为例(在 ae_epoll.c 文件中可见), aeApiCreate
函数会对 apidata
成员装填入一个 aeApiState
结构体。aeApiState
结构体中存放了一个 epoll
实例的描述符 epfd
,以及一个 epoll_event
结构体数组起始元素的指针 events
,如下所示:

typedef struct aeApiState {
// epoll_event 实例描述符
int epfd;

// 事件槽
struct epoll_event *events;
} aeApiState;

  aeApiState
结构体的 events
成员将会传入 eopl_wait
函数的第二个入参中,当 epoll_wait
调用检测到有事件就绪时,就会将所有已经就绪的事件 epoll_event
结构体从内核事件表拷贝到 events
所指向的数组中。这种就绪事件的返回机制是 epoll
特有的,其他 I/O 多路复用函数的返回机制也各不相同,因此 aeEventLoop.apidata
成员对不同的 I/O 多路复用函数有不同的用途。

  在 aeCreateEventLoop
函数里使用了 aeApiCreate
对 I/O 多路复用私有数据 apidata
进行了初始化。epoll
中的 aeApiCreate
函数实现如下所示:

static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));

if (!state) return -1;

// 初始化事件槽空间
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}

// 创建 epoll 实例
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}

// 赋值给 eventLoop
eventLoop->apidata = state;
return 0;
}

  可以看到,aeApiCreate
函数中为 aeApiState
结构体分配了空间,并为其 events
成员初始化了一块 epoll_event
结构体数组空间,因为在 epoll
中文件描述符的事件是以 epoll_event
结构体的形式存在的。另外,使用 epoll_create
函数创建了一个 epoll
实例的描述符。这些就是 epoll
I/O 复用所需用到的的私有数据。

为事件循环添加文件事件

  redis 服务器程序在初始化时,需要把想要监听的文件事件创建出来,并添加至主事件循环结构体中。 在 ae.c 文件中给出了文件事件 aeFileEvent
的初始化方法,并且可以将所创建的文件事件加入到指定的事件循环结构体 eventLoop
当中:

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)

{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}

if (fd >= eventLoop->setsize) return AE_ERR;

// 取出文件事件结构
aeFileEvent *fe = &eventLoop->events[fd];

// 监听指定 fd 的指定事件
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;

// 设置文件事件类型,以及事件的处理器
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;

// 私有数据
fe->clientData = clientData;

// 如果有需要,更新事件处理器的最大 fd
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;

return AE_OK;
}

  aeCreateFileEvent
函数接收一个需要挂载文件事件的事件循环 eventLoop
,以及需要监听的文件描述符、事件类型掩码、相应的事件处理函数等。

  在 aeCreateFileEvent
函数中,首先进行判断,确保所注册监听的文件描述符值没有超出可以监听的最大值。然后通过文件描述符值从eventLoop->events
数组中索引出对应项,对该项赋予事件类型掩码,并根据事件类型设置读事件或写事件的处理函数指针。最后判断该文件描述符值是否是最大的描述符值,若是则更新最大描述符值 eventLoop->maxfd

  另外注意到,在 中执行了 aeApiAddEvent(eventLoop, fd, mask)
语句,aeApiAddEvent
函数底层实质上调用了 I/O 多路复用函数的事件挂载方法。对于epoll
,则调用 epoll_ctl
来新增文件事件;对于 select
,则调用了 FD_SET
来设置文件事件。

为事件循环添加时间事件

  除了文件事件,redis 服务器程序还需要监听时间事件。在 ae.c 文件中给出了时间事件 aeTimeEvent
的初始化方法,并且同样可以将所的创建时间事件加入到指定的事件循环 eventLoop
当中:

long long aeCreateTimeEvent(
aeEventLoop *eventLoop,
long long milliseconds,
aeTimeProc *proc,
void *clientData,
aeEventFinalizerProc *finalizerProc)

{
// 更新时间计数器
long long id = eventLoop->timeEventNextId++;

// 创建时间事件结构
aeTimeEvent *te;

te = zmalloc(sizeof(*te));
if (te == NULL) return AE_ERR;

// 设置 ID
te->id = id;

// 设定处理事件的时间
aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
// 设置事件处理器
te->timeProc = proc;
te->finalizerProc = finalizerProc;
// 设置私有数据
te->clientData = clientData;

// 将新事件放入表头
te->next = eventLoop->timeEventHead;
eventLoop->timeEventHead = te;

return id;
}

  aeCreateFileEvent
函数同样接收一个需要挂载时间事件的事件循环 eventLoop
milliseconds
参数表示时间事件多久后到达 ,还需要提供时间事件的处理函数等。

  在aeCreateFileEvent
函数中,首先会更新 eventLoop->timeEventNextId
,表示现存已有的时间事件数目,并以此作为当前时间事件的唯一id标识。然后为时间事件 aeTimeEvent
结构体分配一块内存空间。调用 aeAddMillisecondsToNow
函数计算当前时间事件的到达时刻,并将到达时刻的秒值与毫秒值设置到时间事件 aeTimeEvent
结构体中的 when_sec
when_ms
成员中。最后为aeTimeEvent
结构体设置了事件处理函数指针,并更新 eventLoop->timeEventHead
时间事件链表的表头。

  特别注意,新加入的时间事件,不管它到达时间为何时,都会将其插入到eventLoop->timeEventHead
链表的表头,这样一来,该链表的时间顺序则是乱序的。如果想要查找出到达时间最近的时间事件,则需要 o(N) 地遍历整个链表才能找出。但是不要担心性能问题,因为在一般情况下,redis 服务器只有一个时间事件 serverCron
,因此该无序链表将会退化成单个指针。

主事件循环的事件挂载

  redis 服务器的初始化程序位于 redis.c 文件中,由 initServer
函数完成。在该函数中,可以看到全局事件循环器 server.el
的创建,并指出了最大可监听的文件事件数量 server.maxclients+REDIS_EVENTLOOP_FDSET_INCR
,默认为 10000 + 32 + 96 :

server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);

  在 redis.c 文件和 networking.c 中可以看到,redis 服务器中所有的文件事件(连接应答、命令请求、命令回复等)和时间事件(serverCron)都被挂载到了 server.el
中:

...
aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL)
aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL)
aeCreateFileEvent(server.el,server.sofd,AE_READABLE, acceptUnixHandler,NULL)
...
aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, sendReplyToClient, c)
aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c)
...

主事件循环与事件处理

  redis 服务器程序的主函数 main
也位于 redis.c 文件当中,在 main
函数中首先完成了初始化工作,初始化工作之后正式进入 aeMain
函数进行服务器的主事件循环。aeMain
函数需要接收一个 aeEventLoop
结构体指针作为入参。我们需要将 reids 全局事件循环器 server.el
传入 aeMain
函数中,因为 server.el
中包含了 redis 服务器所有需要监听与处理的事件。

  aeMain
函数的定义位于 ae.c 文件中,aeMain
函数的定义如下所示:

void aeMain(aeEventLoop *eventLoop) {

eventLoop->stop = 0;

while (!eventLoop->stop) {

// 如果有需要在事件处理前执行的函数,则运行它
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);

// 开始处理事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}

  可以看到,aeMain
的事件循环是在 while 循环中不断调用 aeProcessEvents
函数,aeProcessEvents
函数将会处理所传入的事件循环 eventLoop
中文件事件与时间事件。

  aeProcessEvents
函数的实现位于 ae.c 文件中,如下所示:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;

/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */

if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;

// 获取最近的时间事件
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
// 如果时间事件存在的话
// 那么根据最近可执行时间事件和现在时间的时间差来决定文件事件的阻塞时间
long now_sec, now_ms;

/* Calculate the time missing for the nearest
* timer to fire. */

// 计算距今最近的时间事件还要多久才能达到
// 并将该时间距保存在 tv 结构中
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
tvp->tv_sec = shortest->when_sec - now_sec;
if (shortest->when_ms < now_ms) {
tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
tvp->tv_sec --;
} else {
tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
}

// 时间差小于 0 ,说明事件已经可以执行了,将秒和毫秒设为 0 (不阻塞)
if (tvp->tv_sec < 0) tvp->tv_sec = 0;
if (tvp->tv_usec < 0) tvp->tv_usec = 0;
} else {

// 执行到这一步,说明没有时间事件
// 那么根据 AE_DONT_WAIT 是否设置来决定是否阻塞,以及阻塞的时间长度

/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */

if (flags & AE_DONT_WAIT) {
// 设置文件事件不阻塞
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
// 文件事件可以阻塞直到有事件到达为止
tvp = NULL; /* wait forever */
}
}

// 处理文件事件,阻塞时间由 tvp 决定
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
// 从已就绪数组中获取事件
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];

int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;

/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */

// 读事件
if (fe->mask & mask & AE_READABLE) {
// rfired 确保读/写事件只能执行其中一个
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
// 写事件
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}

processed++;
}
}

/* Check time events */
// 执行时间事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);

return processed; /* return the number of processed file/time events */
}

  aeProcessEvents
函数接收一个事件循环结构体 eventLoop
以及一个 flags
标志。flags
标志表示要处理的事件类型,是文件事件还是时间事件,还是两者的或。如果两种事件都不是,则aeProcessEvents
函数将直接返回 0 。另外,flags
标志还可以相或一个 AE_DONT_WAIT
宏值,表示对文件事件的 I/O 多路复用的监听设置为非阻塞式的。

  在 aeProcessEvents
函数中,我们可以看到:

if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);

  在这一句中,如果 flags
标志设置了需要处理时间事件且不设置文件事件 I/O 多路复用监听为非阻塞式,则会调用 aeSearchNearestTimer
函数从 eventLoop
事件循环的时间事件链表 eventLoop->timeEventHead
中取出最近的一个时间事件 shortest
。在 shortest
结构体变量中记录了该时间事件的达到时间,然后将会调用 aeGetTime
函数获取当前时间,并计算出 shortest
时间事件还有多久发生,该时间距保存在 tv
结构体变量中。

  如果时间距 tv
的值小于 0 ,说明最近的时间事件 shortest
已经达到,需要立刻执行。当同时有文件事件和时间事件到达,redis 服务器将会先处理文件事件,然后再处理时间事件。

  redis 服务器程序通过调用 aeApiPoll
函数对事件循环 eventLoop
中的文件事件的就绪与否进行监听,并设置监听的超时时间为 tvp
, tvp
是一个指向上述时间距变量 tv
的指针。

numevents = aeApiPoll(eventLoop, tvp);

  如果时间距 tv
值小于 0,那么就将其重新赋值为 0 ,并传入 aeApiPoll
函数中。此时 aeApiPoll
函数的超时时间为 0 ,表示非阻塞式执行, aeApiPoll
函数会立马返回。然后检查本轮是否有已就绪的文件事件,有则先处理文件事件,处理完毕后,接着处理时间事件。 如果时间距 tv
值大于 0,那么说明距离最近的时间事件的到来还有一段时间,则将该时间距设置为 aeApiPoll
函数 I/O 多路复用监听的超时时间。aeApiPoll
会最多等待 tv
时间距就会返回,若有文件事件就绪,则会提前返回。

  当 aeApiPoll
函数返回后,其返回值 numevents
表示已就绪的文件事件数目,接下来会进行 for 循环 numevents
次,遍历所有的已就绪文件事件:

for (j = 0; j < numevents; j++) {
// 从已就绪数组中获取事件
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
...

  在上面代码片段中, eventLoop->fired
数组装载了当前所有的已就绪文件事件aeFiredEvent
结构体。由于 aeFiredEvent
结构体中只记录了描述符值与事件类型掩码,并不记录事件处理函数。因此需要依次从 eventLoop->fired
数组中取出文件事件的描述符值,再用该描述符值在 eventLoop->events
数组中索引出该已就绪的文件事件 aeFileEvent
结构体。

  接下来,将判断该文件事件的类型,取出相应的读或写的处理函数进行执行,如下所示:

int rfired = 0;
if (fe->mask & mask & AE_READABLE) {
// rfired 确保读/写事件只能执行其中一个
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
// 写事件
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}

  从上面代码可以看出,如果一个文件描述符(或 Socket)既可读又可写,那么服务器程序会先执行读处理函数,再在执行写处理函数。

  某些情况下,读事件与写事件被设置具有相同的处理函数,如果某个描述符同时读写事件就绪,那么其处理函数将会被执行两次。

  为了避免这种情况发生,在上面代码片段中,用 rfired
变量记录本轮是否执行过读操作。如果已执行过读处理函数,且读写事件处理函数相同的话,则不再执行写事件的处理函数。

  当目前所有的已就绪文件事件都被处理完毕后,接下来就会用 processTimeEvents
函数处理 eventLoop
中注册的时间事件, processTimeEvents
函数定义在 ae.c 文件中,如下所示:

static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
time_t now = time(NULL);

/* If the system clock is moved to the future, and then set back to the
* right value, time events may be delayed in a random way. Often this
* means that scheduled operations will not be performed soon enough.
*
* Here we try to detect system clock skews, and force all the time
* events to be processed ASAP when this happens: the idea is that
* processing events earlier is less dangerous than delaying them
* indefinitely, and practice suggests it is. */

// 通过重置事件的运行时间,
// 防止因时间穿插(skew)而造成的事件处理混乱
if (now < eventLoop->lastTime) {
te = eventLoop->timeEventHead;
while(te) {
te->when_sec = 0;
te = te->next;
}
}
// 更新最后一次处理时间事件的时间
eventLoop->lastTime = now;

// 遍历链表
// 执行那些已经到达的事件
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
while(te) {
long now_sec, now_ms;
long long id;

// 跳过无效事件
if (te->id > maxId) {
te = te->next;
continue;
}

// 获取当前时间
aeGetTime(&now_sec, &now_ms);

// 如果当前时间等于或等于事件的执行时间,那么说明事件已到达,执行这个事件
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;

id = te->id;
// 执行事件处理器,并获取返回值
retval = te->timeProc(eventLoop, id, te->clientData);
processed++;
/* After an event is processed our time event list may
* no longer be the same, so we restart from head.
* Still we make sure to don't process events registered
* by event handlers itself in order to don't loop forever.
* To do so we saved the max ID we want to handle.
*
* FUTURE OPTIMIZATIONS:
* Note that this is NOT great algorithmically. Redis uses
* a single time event so it's not a problem but the right
* way to do this is to add the new elements on head, and
* to flag deleted elements in a special way for later
* deletion (putting references to the nodes to delete into
* another linked list). */


// 记录是否有需要循环执行这个事件时间
if (retval != AE_NOMORE) {
// 是的, retval 毫秒之后继续执行这个时间事件
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {
// 不,将这个事件删除
aeDeleteTimeEvent(eventLoop, id);
}

// 因为执行事件之后,事件列表可能已经被改变了
// 因此需要将 te 放回表头,继续开始执行事件
te = eventLoop->timeEventHead;
} else {
te = te->next;
}
}
return processed;
}

  从 processTimeEvents
函数中可以看到, processTimeEvents
函数将会检查 eventLoop->timeEventHead
链表中的每一项时间事件的执行时间是否已经到达。若是则调用其事件处理函数 te->timeProc
,否则将继续检查链表下一项。

  需要注意的是,调用 te->timeProc
时间事件函数将会返回一个整数,其取值有两种情况:

  1. 返回 -1 (ae.h/AE_NOMORE
    宏值)
    ,表示该时间事件不是周期性事件,执行完一次后需要调用 aeDeleteTimeEvent
    将其从 eventLoop->timeEventHead
    链表中删除。
  2. 返回一个非 -1 的整数,表示该时间事件是一个周期性事件,该返回值为该事件还有多久再次执行。需要调用 aeAddMillisecondsToNow
    函数来更新该时间事件的到达时间 te->when_sec
    te->when_ms

文件事件的流转

  至此,我们走马观花地浏览 redis 服务器事件循环处理的大致流程。redis 服务器对文件事件的处理,会经历如下的流转步骤:

  1. 文件描述符(Socket)。redis 服务器程序使用文件事件 aeFileEvent
    结构体对文件描述符及其 I/O 操作进行了封装。 aeFileEvent
    中包含了文件描述符、事件类型掩码、事件处理函数等要素。
  2. I/O 多路复用程序。redis 服务器可能需要同时与多个客户端 Socket 建立连接,需要使用 I/O 多路复用 aeApiPoll
    函数监听各个 Socket 上文件事件的就绪状态。aeApiPoll
    函数的内部实质上调用了各个平台的 I/O 多路复用函数(select
    epoll
    kqueue
    evport
    ),这些 I/O 多路复用函数会通过各种数据结构形式 aeApiState
    来返回已就绪的文件事件。aeApiPoll
    函数则会将这些已就绪的文件事件加入至一个已就绪事件队列 aeEventLoop.fired
    当中。
  3. 事件分发程序。在 aeProcessEvents
    函数中,会调用 aeApiPoll
    函数获得已就绪的文件事件队列,然后通过一个  for 循环 以有序、同步、每次一个文件事件的方式,从队列中取出文件事件,传送给文件事件分发器作相应分发处理。文件事件分发程序实质上是该 for 循环中一段 if 条件分支语句,根据每个文件事件的事件类型分流到相应的分支上调用相应的事件处理函数作处理,处理完毕后才会取出下一个文件事件进行分发与处理。
  4. 事件处理程序。文件事件结构体 aeFileEvent
    中包含了读和写操作的处理函数指针 rfileProc
    wfileProc
    ,这就是该文件事件的事件处理程序。经过上述事件分发程序 if 语句的分发,文件事件会进入读或写事件类型对应的分支上,然后调用自身的读或写事件的处理函数进行执行。

事件调度总结

  本篇,我们了解了 redis 服务器事件循环机制的源码实现,redis 服务器以一套融洽的调度方式保障了文件事件与时间事件的交替监听和处理。主要有如下特点:

  1. 在 ae.c 中,文件事件的监听由 aeApiPoll
    函数完成的,aeApiPoll
    函数的最大阻塞等待时间是由最近的时间事件到达时间决定的。这样既可以避免服务器对时间事件进行过于频繁的轮询(忙等待),也可以保证对文件事件的监听不会阻塞太久。

  2. I/O 复用 aeApiPoll
    函数每次会提供一组就绪的文件事件供 redis 服务器去作处理。redis 服务器在处理完一组文件事件后,如果检查到仍未有时间事件到达,那么会再次调用 aeApiPoll
    I/O 复用程序去等待并处理文件事件。随着文件事件的不断执行,时间会逐渐逼近时间事件所设置的到达时间,最终抵达到达时间后,redis 服务器就会去处理该时间事件了。

  3. redis 服务器对文件事件和时间事件的处理是同步、有序、原子的。redis 服务器不会中途中断事件的处理,事件之间也不会相互抢占 CPU 资源。

  4. 一个文件描述符同时可读可写时,会先处理读事件,后处理写事件。在事件循环过程中,即使时间事件已到达,也会优先处理文件事件,再处理时间事件。由于事件之间不会相互抢占,因此时间事件的处理时间会比预设时间稍晚一些。


文章转载自架构狂人,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论