Fork me on GitHub

条形与波浪

随心博客 blog

bars and waves

深入 redis 源码, ae 抽象

作为单线程应用的 redis 不出所料的使用了 I/O 多路复用技术, 在 linux 上是 epoll, 在 BSD 上是 kqueue, 为了统一操作, redis 在这些底层上做了一层抽象, 名为 ae.

ae 的目标

  1. 提供统一的跨平台 I/O 多路复用 API.
  2. 内置 timer 事件.

下面以 epoll 为例, 说明 ae 如何达成这两个目标.

紧贴 epoll 的几个 API

epoll 的几个 API: epoll_create(), epoll_ctl(), epoll_wait() 我们应该已经熟悉了, 在redis 源码中调用这几个函数的地方就是对 epoll 作抽象的地方: ae_epoll.c.

以 epoll_create() 为例, aeApiCreate() 内部调用 epoll_create():

typedef struct aeApiState {
    int epfd;
    struct epoll_event *events;
} aeApiState;

static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
        zfree(state);
        return -1;
    }
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }
    eventLoop->apidata = state;
    return 0;
}

aeApiCreate() 做了两件事:

  1. 调用底层函数 epoll_create().
  2. 操作 eventLoop 相应字段, 做些数据记录.

实际上, ae 封装 epoll_ctl(), epoll_wait() 均是如法炮制: 在调用底层 API 的同时维护着 eventLoop 这个结构.

那么这个 eventLoop 是什么?

作为 ae 核心的 eventLoop

文件 ae.h 和 ae.c 中包含了 ae 的全部代码, 作为 ae 核心的数据结构 eventLoop 定义如下:

/* State of an event based program */
typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    time_t lastTime;     /* Used to detect system clock skew */
    aeFileEvent *events; /* Registered events */
    aeFiredEvent *fired; /* Fired events */
    aeTimeEvent *timeEventHead;
    int stop;
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
    aeBeforeSleepProc *aftersleep;
    int flags;
} aeEventLoop;

它记录着所有与事件循环有关的信息, 创建 ae 事件循环的 API: aeCreateEventLoop(), 在调用 aeApiCreate() 的同时创建并初始化了一个 eventLoop 结构:

aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;

    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    eventLoop->setsize = setsize;
    eventLoop->lastTime = time(NULL);
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    eventLoop->aftersleep = NULL;
    eventLoop->flags = 0;
    if (aeApiCreate(eventLoop) == -1) goto err;
    /* Events with mask == AE_NONE are not set. So let's initialize the
     * vector with it. */
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    return eventLoop;

err:
    if (eventLoop) {
        zfree(eventLoop->events);
        zfree(eventLoop->fired);
        zfree(eventLoop);
    }
    return NULL;
}

创建一个 ae 事件循环后, 我们要做的事情自然是在这个事件循环上添加我们感兴趣的 fd, 调用 aeCreateFileEvent():

/* File event structure */
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

aeCreateFileEvent() 在调用 aeApiAddEvent() 的同时维护着 eventLoop.

这里要注意到应用的自定义数据作为 clientData 挂在了 aeFileEvent 上, 由 ae 自己来维护, 而不是利用 epoll_data 的机制交给内核维护.

创建完 ae 事件循环, 向事件循环里添加 fd, 这些都完成后, 我们要让进程睡眠, 等待事件的发生. 调用 aeProcessEvents():

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            long now_sec, now_ms;

            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;

            /* How many milliseconds we need to wait for the next
             * time event to fire? */
            long long ms =
                (shortest->when_sec - now_sec)*1000 +
                shortest->when_ms - now_ms;

            if (ms > 0) {
                tvp->tv_sec = ms/1000;
                tvp->tv_usec = (ms % 1000)*1000;
            } else {
                tvp->tv_sec = 0;
                tvp->tv_usec = 0;
            }
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }

        if (eventLoop->flags & AE_DONT_WAIT) {
            tv.tv_sec = tv.tv_usec = 0;
            tvp = &tv;
        }

        /* Call the multiplexing API, will return only on timeout or when
         * some event fires. */
        numevents = aeApiPoll(eventLoop, tvp);

        /* After sleep callback. */
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0; /* Number of events fired for current fd. */

            int invert = fe->mask & AE_BARRIER;

            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
            }

            /* Fire the writable event. */
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            /* If we have to invert the call, fire the readable event now
             * after the writable one. */
            if (invert) {
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
                if ((fe->mask & mask & AE_READABLE) &&
                    (!fired || fe->wfileProc != fe->rfileProc))
                {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            processed++;
        }
    }
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

函数非常长, 但是它也同它的兄弟 API 同样做了这两件事:

  1. 调用封装完成的多路复用 API, 在这里是 aeApiPoll().
  2. 维护 eventLoop 相应数据.

除此之外, 你可能也注意到了, 这个函数内部里有大片代码是用 time 有关的, 比如 processTimeEvents(), aeSearchNearestTimer() 等等. 这些针对时间相关的处理其实就是 ae 自己实现的 timer 事件的一部分.

ae 的 timer 事件

在 linux 下我们可以使用 timerfd_create() 来创造 timer 事件进行监听, 而为了保持跨平台的特性, ae 选择自己实现 timer 事件.

我们都知道 epoll_wait() 中的参数 timeout 可以设置进程等待时间, ae 的 timer 事件的实现就是利用了这个机制, 上面的代码片段就展示了这一点.

timer 事件的封装与普通的文件事件的封装类似:

/* Time event structure */
typedef struct aeTimeEvent {
    long long id; /* time event identifier. */
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */
    aeTimeProc *timeProc;
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *prev;
    struct aeTimeEvent *next;
} aeTimeEvent;

向 ae 中添加一个 timer 事件实际上就是在 timer 事件的链表中添加一个元素, 每个事件, 既链表中的每个元素上都记录着自己的下次激活时间.

aeProcessEvents() 内部在调用 epoll_wait() 时会设置超时时间, 其值就是我们的时间事件链表里, 距离现在最短的时间. 这样就可以保证 timer 事件超时后会尽快得到处理.

小结

redis 作者为了保持 redis 的简洁, 没有选择 libevent 等事件循环库, 而是自己去实现了 ae. ae 完成了它的目标: 在提供给 redis 事件循环机制的同时最大程度的保持了代码的简练.