Fork me on GitHub

条形与波浪

随心博客 blog

bars and waves

从源码理解 epoll

epoll 是许多重要软件的支撑核心, 比如 nginx, Nodejs. epoll 的 api 简单易用, 但是想要用好它却不是那么容易. 本文从源码探究 epoll 的内部原理, 基于 Linux v5.5.13 版本.

前置条件 & 准备工作 & 目标

读者应已经了解 Linux 下 VFS, fd, open file description 和 inode 等基本概念, OO 概念也有助于理解 epoll 内部实现.

本文默认读者已经可以熟练使用 epoll. 如果你还不太熟悉 epoll 及其 api, 我推荐阅读当代最好的 linux 书籍: The Linux Programming Interface: A Linux and UNIX System Programming Handbook 中的 epoll 相关章节和 epoll's man page.

在阅读本文过程中你可能需要翻阅, 搜索 Linux 源码, 我推荐使用这个网站:

https://elixir.bootlin.com/linux/latest/source

读完本文, 你应该对下面这几个问题的答案胸有成竹:

  • epoll 基本原理
  • epoll 为何比 select 和 poll 高效
  • 边缘触发和水平触发的区别
  • epoll 最佳实践

select, poll 和 epoll

select 和 poll 都用于 I/O 多路复用, 它们的用法相似, 实现在同一个文件 fs/select.c.

当用户想要知道多个 fd 中有哪些是可读/可写/发生异常, 便使用系统调用传递一个 fd 数组, 内核把这个数组从用户空间拷贝到内核空间, 并遍历所有 fd, 使用 fd 对应文件的 poll() 操作来探查是否有事件发生. 之后, select 生成一个 bit array 并拷贝到用户空间, poll 是直接操作用户空间的 pollfd 结构.

因为每次都需要遍历一边 fd 数组, select 和 poll 的时间复杂度是 O(n), 当 fd 的数量越来越多时, cpu 的负担也越来越大. 同时处理成千上万连接的服务器如今也非常常见, 为了使 I/O 多路复用更加高效, 我们需要一个比 select 和 poll 更加精巧的实现, epoll 也就应运而生了.

epoll_create 创建核心

fs/eventpoll.c 定义了函数 do_epoll_create, 其调用创建出 epoll 核心数据结构 struct eventpoll:

static int do_epoll_create(int flags)
{
    int error, fd;
    struct eventpoll *ep = NULL;
    struct file *file;

// ...

    error = ep_alloc(&ep);

    fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));

    file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
                 O_RDWR | (flags & O_CLOEXEC));

    ep->file = file;
    fd_install(fd, file);
    return fd;

// ...
}

ep_alloc 分配空间并初始化 struct eventpoll. get_unused_fd_flags 获取一个 fd. anon_inode_getfile 获取一个匿名 inode(file) 的同时, 将 eventpoll 指针挂在了 file.private_data, 这样使得系统在已知 fd 时, 可以轻松根据路径 fd -> file -> eventpoll 获取到 eventpoll. 最后 fd_install 将 fd 和 file 绑定.

记录着 fd 的红黑树

eventpoll.rbr 是一颗红黑树, 其内部记录着所有我们关心的 fd. struct epitem 是红黑树的节点结构, 其内的字段 struct epoll_filefd 作为节点之间比较大小的 key:

struct epoll_filefd {
    struct file *file;
    int fd;
} __packed;

static inline int ep_cmp_ffd(struct epoll_filefd *p1,
                 struct epoll_filefd *p2)
{
    return (p1->file > p2->file ? +1:
            (p1->file < p2->file ? -1 : p1->fd - p2->fd));
}

ep_cmp_ffd 先比较 file 地址大小, 再比较 fd 大小. 这里也就从源码解释了 epoll's man page 内的一句话:

What is the key used to distinguish the file descriptors registered in an interest list?

The key is the combination of the file descriptor number and the open file description (also known as an "open file handle", the kernel's internal representation of an open file).

调用 epoll_ctl() 来添加 fd 时, 内部先调用 ep_find() 确保 fd 之前没有添加过, 然后调用 ep_insert().

另一个 poll()

在分析 ep_insert() 之前, 需要先分清一个概念. 我们在讨论 poll() 时, 经常是指 select/poll 中的 poll, 在这里, 我们讨论另一个 poll(), 挂载在文件上的 poll() 操作.

include/linux/poll.h 中定义了一个 poll() 内使用的重要结构 poll_table_struct:

typedef struct poll_table_struct {
    poll_queue_proc _qproc;
    __poll_t _key;
} poll_table;

其中字段 _qproc 保存一个函数. 字段 _key 存储着我们关心的 event masks, 在 epoll 里 _key 被设置成 ~(__poll_t)0, 表示 epoll 内部关心所有的事件, 之后再由 epoll 负责过滤出我们真正关心的事件.

结构 struct ep_pqueue 用于粘合 poll_table 和 epitem:

struct ep_pqueue {
    poll_table pt;
    struct epitem *epi;
};

传动链条的装载: ep_insert() 和 poll()

ep_insert() 内部首先为 epitem 分配空间并初始化:

if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
  return -ENOMEM;

INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
ep_set_ffd(&epi->ffd, tfile, fd);
epi->event = *event;
epi->nwait = 0;
epi->next = EP_UNACTIVE_PTR;

然后初始化 struct ep_pqueue epq, 在这里实际粘合起 poll_table 和 epitem. 其中 init_poll_funcptr 设置 poll_table 的字段 _qproc 为 ep_ptable_queue_proc, 字段 _key 为 ~0.

epq.epi = epi;
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);

随后 ep_insert() 调用 ep_item_poll():

revents = ep_item_poll(epi, &epq.pt, 1);

ep_item_poll() 内部调用 vfs_poll():

return vfs_poll(epi->ffd.file, pt) & epi->event.events;

vfs_poll() 再调用实际文件对应的 poll(). 我们以 tcp socket 文件对应的 poll() 实现 tcp_poll() 为例.

net/ipv4/tcp.c 中定义了 tcp_poll():

__poll_t tcp_poll(struct file *file, struct socket *sock, poll_table *wait)
{
    __poll_t mask;
    struct sock *sk = sock->sk;
    const struct tcp_sock *tp = tcp_sk(sk);
    int state;

    sock_poll_wait(file, sock, wait);

// ...
}

tcp_poll() 内部调用 sock_poll_wait(), sock_poll_wait() 内部调用 poll_wait():

static inline void sock_poll_wait(struct file *filp, struct socket *sock,
                  poll_table *p)
{
        poll_wait(filp, &sock->wq.wait, p);

// ...
}

注意到这里 sock_poll_wait() 内部的 &sock->wq.wait, 从 sock 中取出 wait queue head.

上面这一连串调用终于带我们来到了关键函数 poll_wait().

static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
    if (p && p->_qproc && wait_address)
        p->_qproc(filp, wait_address, p);
}

还记得我们在 ep_insert() 中已经把 poll_table 内的字段 _qproc 设置成了 ep_ptable_queue_proc. 所以 poll_wait() 内部的 p->_qproc() 实际上就是调用了 ep_ptable_queue_proc().

接下来我们来看 ep_ptable_queue_proc() 内部到底做了什么.

static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
                 poll_table *pt)
{
    struct epitem *epi = ep_item_from_epqueue(pt);
    struct eppoll_entry *pwq;

    if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
        init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
        pwq->whead = whead;
        pwq->base = epi;
        if (epi->event.events & EPOLLEXCLUSIVE)
            add_wait_queue_exclusive(whead, &pwq->wait);
        else
            add_wait_queue(whead, &pwq->wait);
        list_add_tail(&pwq->llink, &epi->pwqlist);
        epi->nwait++;
    } else {
        /* We have to signal that an error occurred */
        epi->nwait = -1;
    }
}

上面是这个函数的全部内容. 之前我们已经用 ep_pqueue 将 epitem 和 poll_table 粘在了一起, 所以这里 ep_item_from_epqueue() 可以从 poll_table 的地址取出出 epitem.

接下来为 struct eppoll_entry *pwq 分配空间并初始化, 使其将 epitem 和 epitem 对应文件上的 wait queue 粘合在一起. add_wait_queue(whead, &pwq->wait) 在文件的 wait queue 上添加一个新的节点 pwq->wait. init_waitqueue_func_entry(&pwq->wait, ep_poll_callback) 初始化 pwq->wait, 使其上记录了一个重要函数 ep_poll_callback(). list_add_tail(&pwq->llink, &epi->pwqlist) 将 pwq 记录到 epitem 上, epi->nwait 是 epi->pwqlist 的长度, 在一般情况下都是 1.

链条何时转动

文件上的 wait queue 新增了一个节点, 节点内的函数 ep_poll_callback() 会在 Linux 唤醒 wait queue 时调用. 再以 tcp socket 为例, struct sock 内的字段 sk_wq 就存储着其 wait queue, 那么 sk_wq 何时被唤醒?

net/core/sock.c 中定义了函数 sock_init_data():

void sock_init_data(struct socket *sock, struct sock *sk)
{
  sk->sk_data_ready  =   sock_def_readable;
  sk->sk_write_space =  sock_def_write_space;

// ...
}

sock_def_readable() 内调用 wake_up_interruptible_sync_poll(), 然后经过又是一连串的调用, 最后在一个循环中调用了我们之前记录在 pwq->wait 上的函数 ep_poll_callback():

static int __wake_up_common(struct wait_queue_head *wq_head, unsigned int mode,
            int nr_exclusive, int wake_flags, void *key,
            wait_queue_entry_t *bookmark)
{
  wait_queue_entry_t *curr;

  ret = curr->func(curr, mode, wake_flags, key);

// ...
}

这里的 curr 就是 pwq->wait. 对于 sock_def_write_space() 也是同样的执行过程.

那么问题来了, sk->sk_data_ready() 和 sk->sk_write_space() 何时被调用? 这非常依赖于实现, tcp 连接的三次握手成功或着有数据的到来会触发 bottom half 中断从而调用 sk->sk_data_ready(). 内核 write buffer 从满状态转变为非满状态时会调用 sk->sk_write_space(). 这里你是不是联想到了边缘触发这个概念? 别急, 后面我们再讨论边缘触发.

高效核心 ep_poll_callback()

我们说到文件上有事件到来时会调用 ep_poll_callback(), 那么它到底做了些什么.

struct epitem *epi = ep_item_from_wait(wait);
struct eventpoll *ep = epi->ep;

read_lock_irqsave(&ep->lock, flags);

ep_poll_callback() 首先从 wait queue entry 中取出 epitem 和 eventpoll, 能够取出也是因为我们提到过的结构 struct eppoll_entry. 然后上锁.

if (pollflags && !(pollflags & epi->event.events))
  goto out_unlock;

过滤 event, 确保有我们所关心的事件发生.

if (READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR) {
  if (epi->next == EP_UNACTIVE_PTR &&
      chain_epi_lockless(epi))
    ep_pm_stay_awake_rcu(epi);
  goto out_unlock;
}

如果用户正在调用 epoll_wait 来将 event 传递到用户空间, 那么这里将此 epitem 挂载到 eventpoll.ovflist 链表上面.

if (!ep_is_linked(epi) &&
    list_add_tail_lockless(&epi->rdllink, &ep->rdllist)) {
  ep_pm_stay_awake_rcu(epi);
}

如果 eventpoll.rdllink 内还没有此 epitem, 就把 epitem 挂载上. eventpoll.rdllink 是一个双向链表, 每个节点都是已经有事件发生的 epitem.

随后唤醒等待着 eventpoll.wq 和 eventpoll.poll_wait 的进程:

if (waitqueue_active(&ep->wq)) {
  if ((epi->event.events & EPOLLEXCLUSIVE) &&
        !(pollflags & POLLFREE)) {
    switch (pollflags & EPOLLINOUT_BITS) {
    case EPOLLIN:
      if (epi->event.events & EPOLLIN)
        ewake = 1;
      break;
    case EPOLLOUT:
      if (epi->event.events & EPOLLOUT)
        ewake = 1;
      break;
    case 0:
      ewake = 1;
      break;
    }
  }
  wake_up(&ep->wq);
}
if (waitqueue_active(&ep->poll_wait))
  pwake++;

out_unlock:
read_unlock_irqrestore(&ep->lock, flags);

if (pwake)
  ep_poll_safewake(&ep->poll_wait);

最后一块拼图 epoll_wait()

epoll_wait() 内部主要内容就是调用了 ep_poll(), ep_poll() 首先根据 timeout 参数确定是阻塞调用还是非阻塞调用:

if (timeout > 0) {
  struct timespec64 end_time = ep_set_mstimeout(timeout);

  slack = select_estimate_accuracy(&end_time);
  to = &expires;
  *to = timespec64_to_ktime(end_time);
} else if (timeout == 0) {
  timed_out = 1;

  write_lock_irq(&ep->lock);
  eavail = ep_events_available(ep);
  write_unlock_irq(&ep->lock);

  goto send_events;
}

在阻塞模式下会进入标签 fetch_events, 探测是否有准备好的 event 可以用来返回:

fetch_events:

if (!ep_events_available(ep))
  ep_busy_loop(ep, timed_out);

eavail = ep_events_available(ep);
if (eavail)
  goto send_events;

如果有, 就进入 send_events 标签, 否则就把当前自身加入到 eventpoll.wq 中:

if (!waiter) {
  waiter = true;
  init_waitqueue_entry(&wait, current);

  write_lock_irq(&ep->lock);
  __add_wait_queue_exclusive(&ep->wq, &wait);
  write_unlock_irq(&ep->lock);
}

然后调用 schedule_hrtimeout_range() 进入睡眠:

for (;;) {
  set_current_state(TASK_INTERRUPTIBLE);
  if (fatal_signal_pending(current)) {
    res = -EINTR;
    break;
  }

  eavail = ep_events_available(ep);
  if (eavail)
    break;
  if (signal_pending(current)) {
    res = -EINTR;
    break;
  }

  if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS)) {
    timed_out = 1;
    break;
  }
}

__set_current_state(TASK_RUNNING);

ep_poll_callback(), 超时, 信号都可以唤醒进程. 唤醒后调用 ep_events_available() 检查是否有 event, 如果有就进入 send_events, 没有就继续睡眠. 这种机制使得这部分代码是在一个无限 for 循环之中.

最后就是标签 send_events 的内容, 尝试将 events 发送到用户空间:

send_events:
if (!res && eavail &&
    !(res = ep_send_events(ep, events, maxevents)) && !timed_out)
  goto fetch_events;

if (waiter) {
  write_lock_irq(&ep->lock);
  __remove_wait_queue(&ep->wq, &wait);
  write_unlock_irq(&ep->lock);
}

return res;

如果之前有将自身加入到 eventpoll.wq, 这里也将其去除. 返回的值 res 就是有事件发生的 epitem 的数量.

把 events 从内核传递到用户空间

send_events 标签内调用 ep_send_events(), 这里注意 events 的值是指向用户空间的指针, 后面会用到. sp_send_event() 内部调用 ep_scan_ready_list():

  ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);

ep_scan_ready_list() 首先将 eventpool.rdllist 内节点全部拿出, 把 eventpool.ovflist 置为 NULL, 然后调用作为参数传递进来的函数 ep_send_events_proc():

write_lock_irq(&ep->lock);
list_splice_init(&ep->rdllist, &txlist);
WRITE_ONCE(ep->ovflist, NULL);
write_unlock_irq(&ep->lock);

res = (*sproc)(ep, &txlist, priv);

在 sproc 执行的过程中可能会有事件发生, 之前我们在 ep_poll_call() 中就探讨过, 这些新的有事件的 epitem 会被挂到 eventpoll.ovflist 上, eventpoll.ovflist 起到了缓存的作用.

将 events 从内核拷贝到用户空间就是由 ep_send_events_proc() 完成的:

list_for_each_entry_safe(epi, tmp, head, rdllink) {
    if (esed->res >= esed->maxevents)
      break;
    revents = ep_item_poll(epi, &pt, 1);
    if (!revents)
      continue;

    if (__put_user(revents, &uevent->events) ||
        __put_user(epi->event.data, &uevent->data)) {
      list_add(&epi->rdllink, head);
      ep_pm_stay_awake(epi);
      if (!esed->res)
        esed->res = -EFAULT;
      return 0;
    }
  }
}

针对每一个 epitem, 先调用 ep_item_poll 最大限度确保确实有事件产生, 然后调用 __put_user 将 events 拷贝到用户空间.

注意我使用了 最大限度 这个词语, 实际上, 在调用 ep_item_poll() 后, 用户着手处理之前这段时间, events 依旧有可能发生了改变, 从而使程序发生意想不到的阻塞. 这也是我们总是将 epoll 和 Non-blocking I/O 搭配使用的缘由之一.

ep_send_events_proc() 最后的一段代码揭示了水平触发和边缘触发的区别:

else if (!(epi->event.events & EPOLLET)) {
  list_add_tail(&epi->rdllink, &ep->rdllist);
  ep_pm_stay_awake(epi);
}

如果是边缘触发, 就把 epitem 重新加入 eventpoll.rdllink, 下次调用 ep_poll() 时总是会检测此 epitem. 这也解释了, 在效率上, 水平触发不如边缘触发.

结语

至此, 我们领略了一遍 epoll 的原理与实现. 核心的 ep_poll_callback() 是 epoll 与 select/poll 之间的根本区别, 同时也揭示了 epoll 高效的根本原因: 在文件上有事件发生时, 由回调函数负责将事件记录在内部结构之中以便后续使用. epoll 的内部实现里还有许多精妙细节我们在这里都没有顾及, 这就需要感兴趣的读者们自己去探索了.

最后, 关于 epoll 与 Non-blocking I/O 这一经典组合的使用, 我这也有一个自认为还不错的例子:

使用 c 与 epoll 实现 socks 代理 https://www.tiaoxingyubolang.com/article/2020-03-22_procurator1