select, poll, epoll的实现分析
select, poll, epoll都是Linux上的IO多路复用机制.知其然知其所以然,为了更好地理解其底层实现,这几天我阅读了这三个系统调用的源码.
以下源代码摘自Linux4.4.0内核.
预备知识
在了解IO多路复用技术之前,首先需要了解Linux内核的3个方面.
1.等待队列waitqueue
等待队列(@ include/linux/wait.h)的队列头(wait_queue_head_t)往往是资源生产者,队列成员(wait_queue_t)往往是资源消费者.当队列头的资源ready后,内核会逐个执行每个队列成员指定的回调函数,来通知它们该资源已经ready了.
2.内核的poll机制
被poll的fd,必须在实现上支持内核的poll技术,比如fd是某个字符设备,或者是一个socket,它必须实现file_operation中的poll操作,这个操作会给该fd分配一个等待队列头.主动poll该fd的进程必须分配一个等待队列成员,并将其添加到该fd的等待队列中去,同时指定该资源ready时的回调函数.拿socket举例,它必须实现一个poll操作,该操作是由发起轮询者(即监听它的进程)主动调用的.这个poll操作必须调用poll_wait(),后者会将发起轮询者作为等待队列成员添加到该socket的等待队列中.当该socket发生状态改变时,就会通过队列头逐个通知所有监听它的进程.
3.epollfd
epollfd其实质就是一个fd.
select
1.源码分析
// @ fs/select.c SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp, fd_set __user *, exp, struct timeval __user *, tvp) { struct timespec end_time, *to = NULL; struct timeval tv; int ret; if (tvp) { // 如果timeout值不为NULL if (copy_from_user(&tv, tvp, sizeof(tv))) // 则把超时值从用户空间拷贝到内核空间. return -EFAULT; to = &end_time; // 计算timespec格式的未来timeout时间. if (poll_select_set_timeout(to, tv.tv_sec + (tv.tv_usec / USEC_PER_SEC), (tv.tv_usec % USEC_PER_SEC) * NSEC_PER_USEC)) return -EINVAL; } ret = core_sys_select(n, inp, outp, exp, to); // 关键函数 // poll_select_copy_remaining函数的作用是: // 如果有超时值,则把距离超时时间所剩的时间,从内核空间拷贝到用户空间. ret = poll_select_copy_remaining(&end_time, tvp, 1, ret); return ret; } int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp, fd_set __user *exp, struct timespec *end_time) { fd_set_bits fds; /** @ include/linux/poll.h typedef struct { unsigned long *in, *out, *ex; unsigned long *res_in, *res_out, *res_ex; } fd_set_bits; **/ // 该结构体内部定义的都是指针,指向描述符集合. void *bits; int ret, max_fds; unsigned int size; struct fdtable *fdt; /** @ include/linux/fdtable.h struct fdtable { unsigned int max_fds; struct file __rcu **fd; // current fd array unsigned long *close_on_exec; unsigned long *open_fds; unsigned long *full_fds_bits; struct rcu_head rcu; }; **/ /* Allocate small arguments on the stack to save memory and be faster */ long stack_fds[SELECT_STACK_ALLOC/sizeof(long)]; /** @ include/linux/poll.h #define FRONTEND_STACK_ALLOC 256 #define SELECT_STACK_ALLOC FRONTEND_STACK_ALLOC **/ // 256 / 8 = 32 // 预先在stack中分配小部分的空间,以节省内存且更加快速 ret = -EINVAL; if (n < 0) goto out_nofds; /* max_fds can increase, so grab it once to avoid race */ rcu_read_lock(); fdt = files_fdtable(current->files); // 获取当前进程的文件描述符表 max_fds = fdt->max_fds; rcu_read_unlock(); if (n > max_fds) // 如果传入的n大于当前进程最大的文件描述符,则修改之. n = max_fds; /* * We need 6 bitmaps (in/out/ex for both incoming and outgoing), * since we used fdset we need to allocate memory in units of * long-words. */ /** 需要分配6个bitmap, 用户传入的in, out, ex, 以及返回给用户的res_in, res_out, res_ex. **/ size = FDS_BYTES(n); // 以一个描述符占一个bit来计算,求出传入的描述符需要多少个byte(以long为单位分配byte)。 bits = stack_fds; if (size > sizeof(stack_fds) / 6) { // 因为一共有6个bitmap, 所以除以6,求得每个bitmap所占的byte个数,用以和size比较。 /* Not enough space in on-stack array; must use kmalloc */ ret = -ENOMEM; bits = kmalloc(6 * size, GFP_KERNEL); // 如果预先在stack分配的空间太小,则在heap上申请内存。 if (!bits) goto out_nofds; } // fds结构体的妙用 fds.in = bits; fds.out = bits + size; fds.ex = bits + 2*size; fds.res_in = bits + 3*size; fds.res_out = bits + 4*size; fds.res_ex = bits + 5*size; /** @ include/linux/poll.h static inline int get_fd_set(unsigned long nr, void __user *ufdset, unsigned long *fdset) { nr = FDS_BYTES(nr); if (ufdset) return copy_from_user(fdset, ufdset, nr) ? -EFAULT : 0; memset(fdset, 0, nr); return 0; } get_fd_set的作用是调用copy_from_user函数, 把用户传入的fd_set从用户空间拷贝到内核空间。 **/ if ((ret = get_fd_set(n, inp, fds.in)) || (ret = get_fd_set(n, outp, fds.out)) || (ret = get_fd_set(n, exp, fds.ex))) goto out; // 对返回给用户的fd_set初始化为零 zero_fd_set(n, fds.res_in); zero_fd_set(n, fds.res_out); zero_fd_set(n, fds.res_ex); ret = do_select(n, &fds, end_time); // 关键函数,完成主要的工作。 if (ret < 0) // 出错 goto out; if (!ret) { // 无文件设备就绪,但超时或有未决信号。 ret = -ERESTARTNOHAND; if (signal_pending(current)) goto out; ret = 0; } // 把结果集合拷贝到用户空间. if (set_fd_set(n, inp, fds.res_in) || set_fd_set(n, outp, fds.res_out) || set_fd_set(n, exp, fds.res_ex)) ret = -EFAULT; out: if (bits != stack_fds) // 如果申请了heap上的内存,则释放之. kfree(bits); out_nofds: return ret; } int do_select(int n, fd_set_bits *fds, struct timespec *end_time) { ktime_t expire, *to = NULL; struct poll_wqueues table; /** @ inclue/linux/poll.h struct poll_wqueues { poll_table pt; struct poll_table_page *table; struct task_struct *polling_task; int triggered; int error; int inline_index; struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES]; }; **/ poll_table *wait; /** @ inclue/linux/poll.h typedef void (*poll_queue_proc)(struct file *, wait_queue_head_t *, struct poll_table_struct *); typedef struct poll_table_struct { poll_queue_proc _qproc; unsigned long _key; } poll_table; **/ int retval, i, timed_out = 0; unsigned long slack = 0; unsigned int busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0; // TODO unsigned long busy_end = 0; rcu_read_lock(); // TODO retval = max_select_fd(n, fds); // 检查fds中fd的有效性(即要求打开),并获得当前最大的fd. rcu_read_unlock(); if (retval < 0) return retval; n = retval; // 初始化poll_wqueues对象table, // 包括初始化poll_wqueues.poll_table._qproc函数指针为__pollwait. poll_initwait(&table); /** @ fs/select.c void poll_initwait(struct poll_wqueues *pwq) { init_poll_funcptr(&pwq->pt, __pollwait); pwq->polling_task = current; pwq->triggered = 0; pwq->error = 0; pwq->table = NULL; pwq->inline_index = 0; } **/ wait = &table.pt; // 如果用户传入的timeout不为NULL,但设定的时间为0, // 则设置poll_table._qproc函数指针为NULL, // 并设置timed_out = 1. if (end_time && !end_time->tv_sec && !end_time->tv_nsec) { wait->_qproc = NULL; timed_out = 1; } // 如果用户传入的timeout不为NULL,且设定的timeout时间不为0, // 则转换timeout时间. if (end_time && !timed_out) slack = select_estimate_accuracy(end_time); retval = 0; for (;;) { unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp; bool can_busy_loop = false; inp = fds->in; outp = fds->out; exp = fds->ex; rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex; for (i = 0; i < n; ++rinp, ++routp, ++rexp) { unsigned long in, out, ex, all_bits, bit = 1, mask, j; unsigned long res_in = 0, res_out = 0, res_ex = 0; // 将in, out, exception进行位或运算,得到all_bits. in = *inp++; out = *outp++; ex = *exp++; all_bits = in | out | ex; // 如果这BITS_PER_LONG个描述符不需要被监听, // 则continue到下一个32个描述符中的循环. if (all_bits == 0) { i += BITS_PER_LONG; continue; } // 本次这BITS_PER_LONG个描述符中有需要监听的. for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1) { struct fd f; if (i >= n) // 检测i是否超出了待监听的最大描述符. break; // 每次循环后bit左移一位,用来跳过不需要被监听的描述符. if (!(bit & all_bits)) continue; f = fdget(i); // 获取file结构,并增加其引用计数. if (f.file) { const struct file_operations *f_op; f_op = f.file->f_op; mask = DEFAULT_POLLMASK; if (f_op->poll) { wait_key_set(wait, in, out, // 设置当前描述符待监听的事件掩码. bit, busy_flag); mask = (*f_op->poll)(f.file, wait); // f_op->poll函数执行如下: // 1, 调用poll_wait函数(在include/linux/poll.h); // 2, 检测当前描述符所对应的文件设备的状态,并返回状态掩码mask. // poll_wait函数的定义如下: /** @ include/linux/poll.h static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p) { if (p && p->_qproc && wait_address) p->_qproc(filp, wait_address, p); } **/ // 而由上文的poll_initwait函数可知,p->_qproc指向__pollwait函数. // __pollwait函数的定义如下: /** @ fs/select.c // __pollwait函数的作用是把当前进程添加到当前文件设备的等待队列中. static void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p) { struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt); struct poll_table_entry *entry = poll_get_entry(pwq); if (!entry) return; entry->filp = get_file(filp); entry->wait_address = wait_address; entry->key = p->_key; init_waitqueue_func_entry(&entry->wait, pollwake); entry->wait.private = pwq; add_wait_queue(wait_address, &entry->wait); } **/ } fdput(f); // 释放file结构指针,实质是减少其引用计数. // mask是执行f_op->poll函数后所返回的文件设备状态掩码. if ((mask & POLLIN_SET) && (in & bit)) { res_in |= bit; // 描述符对应的文件设备可读 retval++; wait->_qproc = NULL; // 避免重复执行__pollwait函数 } if ((mask & POLLOUT_SET) && (out & bit)) { res_out |= bit; // 描述符对应的文件设备可写 retval++; wait->_qproc = NULL; } if ((mask & POLLEX_SET) && (ex & bit)) { res_ex |= bit; // 描述符对应的文件设备发生error retval++; wait->_qproc = NULL; } /* got something, stop busy polling */ if (retval) { can_busy_loop = false; busy_flag = 0; /* * only remember a returned * POLL_BUSY_LOOP if we asked for it */ } else if (busy_flag & mask) can_busy_loop = true; } } // 根据f_op->poll函数的结果,设置rinp, routp, rexp if (res_in) *rinp = res_in; if (res_out) *routp = res_out; if (res_ex) *rexp = res_ex; cond_resched(); } wait->_qproc = NULL; // 避免重复执行__pollwait函数 // 如果有文件设备就绪,或超时,或有未决信号; // 亦或者发生错误, // 则退出大循环. if (retval || timed_out || signal_pending(current)) break; if (table.error) { retval = table.error; break; } /* only if found POLL_BUSY_LOOP sockets && not out of time */ if (can_busy_loop && !need_resched()) { // TODO if (!busy_end) { busy_end = busy_loop_end_time(); continue; } if (!busy_loop_timeout(busy_end)) continue; } busy_flag = 0; /* * If this is the first loop and we have a timeout * given, then we convert to ktime_t and set the to * pointer to the expiry value. */ if (end_time && !to) { expire = timespec_to_ktime(*end_time); // 转换timeout时间. to = &expire; } // 第一次循环时,当前用户进程在这里进入睡眠. // 超时,poll_schedule_timeout()返回0;被唤醒时返回-EINTR. if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE, to, slack)) timed_out = 1; } // 把当前进程从所有文件的等待队列中删掉,并回收内存. poll_freewait(&table); // 返回就绪的文件描述符的个数. return retval; }
2.实现过程
一. 如果用户传入的超时值不为NULL,则把用户空间的timeout对象拷贝到内 核空间,并计算timespec格式的超时时间.
二. 调用core_sys_select函数.过程如下:
- 根据传入的maxfds值,计算出保存所有文件描述符需要多少字节(1个字节占1bit),再根据字节数判断是在栈上还是堆上分配内存(如果字节数太大,则在堆上分配).一共需要分配6个bitmap,用户传入的in,out,ex,以及返回给用户的res_in,res_out,res_ex.
- 将3个用户传入的fdset(in, out, ex)从用户空间拷贝到内核空间,并将3个返回给用户的fdset(res_in, res_out, res_ex)初始化为0.
- 调用do_select函数.过程如下:
3a. 检查传入的fds中fd的有效性.
3b. 调用poll_initwait函数,以初始化poll_wqueues对象table,其中初始化poll_wqueues.poll_table._qproc函数指针为__pollwait.
3c. 如果传入的超时值不为空,但设定的时间为0,则设置poll_table._qproc函数指针为NULL.
3d. 进入大循环.
3e.
将in, out,
ex进行位或运算,得到all_bits.然后遍历all_bits中bit为1的文件描述符,根据当前进程的文件描述符表,获取与当前描述符对应的file结构指针f,并设 置当前描述符待监听的事件掩码,调用f_op->poll函数,该函数会把当前用户进程添加到当前描述符的等待队列中,并获得返回值mask.根据mask设置相应的 返回fdset,执行retval++,并设置wait->_qproc为NULL.
3f. 遍历完所有的文件描述符后,设置wait->_qproc为NULL,并检查是否有文件设备就绪,或超时,或有未决信号,或发生错误,是则退出大循环,执行第8步.
3g. 在第一次循环后,如果设置了超时值,用户进程会调用poll_schedule_timeout进入睡眠.
3h. 最终,调用poll_freewait函数,把当前进程从所有文件的等待队列中删除,回收内存.并返回retval值. - 把res_in, res_out, res_ex从内核空间拷贝到用户空间.并返回ret.
三. 如果设有超时值,则把剩余的超时时间从内核空间拷贝到用户空间.
3.小结
从上述源代码的分析可见,select的低效体现在两个方面:
- 内存复制开销,每次调用select,都要把文件描述符集合从用户空间拷贝到内核空间,又从内核空间拷贝到用户空间.
- 遍历开销,每次调用select,都要在内核中遍历所有传入的文件描述符.
另外select系统调用的第一个参数maxfdp1,其大小被限制在1024内(即FD_SETSIZE),可监听的文件数量有限,这也是select低效的原因.
poll
poll的实现与select大致相同,故不做赘述.区别有两点:
- 文件描述符集合的结构不同,poll使用的是pollfd,而select使用的是fd_set.
- poll没有限制可监听的文件数量.
epoll
1.源码分析
// @ fs/eventpoll.c /* * This structure is stored inside the "private_data" member of the file * structure and represents the main data structure for the eventpoll * interface. */ // 每创建一个epollfd,内核就会分配一个eventpoll与之对应,可以理解成内核态的epollfd. struct eventpoll { /* Protect the access to this structure */ spinlock_t lock; /* * This mutex is used to ensure that files are not removed * while epoll is using them. This is held during the event * collection loop, the file cleanup path, the epoll file exit * code and the ctl operations. */ /** 添加,修改或删除监听fd的时候,以及epoll_wait返回,向用户空间传递数据时,都会持有这个互斥锁. 因此,在用户空间中执行epoll相关操作是线程安全的,内核已经做了保护. **/ struct mutex mtx; /* Wait queue used by sys_epoll_wait() */ /** 等待队列头部. 当在该等待队列中的进程调用epoll_wait()时,会进入睡眠. **/ wait_queue_head_t wq; /* Wait queue used by file->poll() */ /** 用于epollfd被f_op->poll()的时候 **/ wait_queue_head_t poll_wait; /* List of ready file descriptors */ /** 所有已经ready的epitem被存放在这个链表里 **/ struct list_head rdllist; /* RB tree root used to store monitored fd structs */ /** 所有待监听的epitem被存放在这个红黑树里 **/ struct rb_root rbr; /* * This is a single linked list that chains all the "struct epitem" that * happened while transferring ready events to userspace w/out * holding ->lock. */ /** 当event转移到用户空间时,这个单链表存放着所有struct epitem **/ struct epitem *ovflist; /* wakeup_source used when ep_scan_ready_list is running */ struct wakeup_source *ws; // TODO /* The user that created the eventpoll descriptor */ /** 这里存放了一些用户变量,比如fd监听数量的最大值等 **/ struct user_struct *user; struct file *file; /* used to optimize loop detection check */ // TODO int visited; struct list_head visited_list_link; }; /* * Each file descriptor added to the eventpoll interface will * have an entry of this type linked to the "rbr" RB tree. * Avoid increasing the size of this struct, there can be many thousands * of these on a server and we do not want this to take another cache line. */ // epitem表示一个被监听的fd struct epitem { union { /* RB tree node links this structure to the eventpoll RB tree */ /** 红黑树结点,当使用epoll_ctl()将一批fd加入到某个epollfd时,内核会分配一批epitem与fd一一对应, 并且以红黑树的形式来组织它们,tree的root被存放在struct eventpoll中. **/ struct rb_node rbn; /* Used to free the struct epitem */ struct rcu_head rcu; // TODO }; /* List header used to link this structure to the eventpoll ready list */ /** 链表结点,所有已经ready的epitem都会被存放在eventpoll的rdllist链表中. **/ struct list_head rdllink; /* * Works together "struct eventpoll"->ovflist in keeping the * single linked chain of items. */ struct epitem *next; // 用于eventpoll的ovflist /* The file descriptor information this item refers to */ /** epitem对应的fd和struct file **/ struct epoll_filefd ffd; /* Number of active wait queue attached to poll operations */ int nwait; // 当前epitem被加入到多少个等待队列中 /* List containing poll wait queues */ struct list_head pwqlist; /* The "container" of this item */ /** 当前epitem属于那个eventpoll **/ struct eventpoll *ep; /* List header used to link this item to the "struct file" items list */ struct list_head fllink; /* wakeup_source used when EPOLLWAKEUP is set */ struct wakeup_source __rcu *ws; /* The structure that describe the interested events and the source fd */ /** 当前epitem关心哪些event,这个数据是由执行epoll_ctl时从用户空间传递过来的 **/ struct epoll_event event; }; struct epoll_filefd { struct file *file; int fd; } __packed; /* Wait structure used by the poll hooks */ struct eppoll_entry { /* List header used to link this structure to the "struct epitem" */ struct list_head llink; /* The "base" pointer is set to the container "struct epitem" */ struct epitem *base; /* * Wait queue item that will be linked to the target file wait * queue head. */ wait_queue_t wait; /* The wait queue head that linked the "wait" wait queue item */ wait_queue_head_t *whead; }; /* Used by the ep_send_events() function as callback private data */ struct ep_send_events_data { int maxevents; struct epoll_event __user *events; }; /** 调用epoll_create()的实质,就是调用epoll_create1(). **/ SYSCALL_DEFINE1(epoll_create, int, size) { if (size <= 0) return -EINVAL; return sys_epoll_create1(0); } /* * Open an eventpoll file descriptor. */ SYSCALL_DEFINE1(epoll_create1, int, flags) { int error, fd; struct eventpoll *ep = NULL; struct file *file; /* Check the EPOLL_* constant for consistency. */ BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC); /** 对于epoll来说,目前唯一有效的FLAG是CLOSEXEC **/ if (flags & ~EPOLL_CLOEXEC) return -EINVAL; /* * Create the internal data structure ("struct eventpoll"). */ /** 分配一个struct eventpoll,ep_alloc()的具体分析在下面 **/ error = ep_alloc(&ep); if (error < 0) return error; /* * Creates all the items needed to setup an eventpoll file. That is, * a file structure and a free file descriptor. */ fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC)); // TODO if (fd < 0) { error = fd; goto out_free_ep; } /** 创建一个匿名fd. epollfd本身并不存在一个真正的文件与之对应,所以内核需要创建一个"虚拟"的文件,并为之分配 真正的struct file结构,并且具有真正的fd. **/ file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep, O_RDWR | (flags & O_CLOEXEC)); if (IS_ERR(file)) { error = PTR_ERR(file); goto out_free_fd; } ep->file = file; fd_install(fd, file); return fd; out_free_fd: put_unused_fd(fd); out_free_ep: ep_free(ep); return error; } /** 分配一个eventpoll结构 **/ static int ep_alloc(struct eventpoll **pep) { int error; struct user_struct *user; struct eventpoll *ep; /** 获取当前用户的一些信息,比如最大监听fd数目 **/ user = get_current_user(); error = -ENOMEM; ep = kzalloc(sizeof(*ep), GFP_KERNEL); // 话说分配eventpoll对象是使用slab还是用buddy呢?TODO if (unlikely(!ep)) goto free_uid; /** 初始化 **/ spin_lock_init(&ep->lock); mutex_init(&ep->mtx); init_waitqueue_head(&ep->wq); init_waitqueue_head(&ep->poll_wait); INIT_LIST_HEAD(&ep->rdllist); ep->rbr = RB_ROOT; ep->ovflist = EP_UNACTIVE_PTR; ep->user = user; *pep = ep; return 0; free_uid: free_uid(user); return error; } /* * The following function implements the controller interface for * the eventpoll file that enables the insertion/removal/change of * file descriptors inside the interest set. */ /** 调用epool_ctl来添加要监听的fd. 参数说明: epfd,即epollfd op,操作,ADD,MOD,DEL fd,需要监听的文件描述符 event,关心的events **/ SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd, struct epoll_event __user *, event) { int error; int full_check = 0; struct fd f, tf; struct eventpoll *ep; struct epitem *epi; struct epoll_event epds; struct eventpoll *tep = NULL; error = -EFAULT; /** 错误处理以及 将event从用户空间拷贝到内核空间. **/ if (ep_op_has_event(op) && copy_from_user(&epds, event, sizeof(struct epoll_event))) goto error_return; error = -EBADF; /** 获取epollfd的file结构,该结构在epoll_create1()中,由函数anon_inode_getfile()分配 **/ f = fdget(epfd); if (!f.file) goto error_return; /* Get the "struct file *" for the target file */ /** 获取待监听的fd的file结构 **/ tf = fdget(fd); if (!tf.file) goto error_fput; /* The target file descriptor must support poll */ error = -EPERM; /** 待监听的文件一定要支持poll. 话说什么情况下文件不支持poll呢?TODO **/ if (!tf.file->f_op->poll) goto error_tgt_fput; /* Check if EPOLLWAKEUP is allowed */ if (ep_op_has_event(op)) ep_take_care_of_epollwakeup(&epds); /* * We have to check that the file structure underneath the file descriptor * the user passed to us _is_ an eventpoll file. And also we do not permit * adding an epoll file descriptor inside itself. */ error = -EINVAL; /** epollfd不能监听自己 **/ if (f.file == tf.file || !is_file_epoll(f.file)) goto error_tgt_fput; /* * At this point it is safe to assume that the "private_data" contains * our own data structure. */ /** 获取eventpoll结构,来自于epoll_create1()的分配 **/ ep = f.file->private_data; /* * When we insert an epoll file descriptor, inside another epoll file * descriptor, there is the change of creating closed loops, which are * better be handled here, than in more critical paths. While we are * checking for loops we also determine the list of files reachable * and hang them on the tfile_check_list, so we can check that we * haven't created too many possible wakeup paths. * * We do not need to take the global 'epumutex' on EPOLL_CTL_ADD when * the epoll file descriptor is attaching directly to a wakeup source, * unless the epoll file descriptor is nested. The purpose of taking the * 'epmutex' on add is to prevent complex toplogies such as loops and * deep wakeup paths from forming in parallel through multiple * EPOLL_CTL_ADD operations. */ /** 以下操作可能会修改数据结构内容,锁 **/ // TODO mutex_lock_nested(&ep->mtx, 0); if (op == EPOLL_CTL_ADD) { if (!list_empty(&f.file->f_ep_links) || is_file_epoll(tf.file)) { full_check = 1; mutex_unlock(&ep->mtx); mutex_lock(&epmutex); if (is_file_epoll(tf.file)) { error = -ELOOP; if (ep_loop_check(ep, tf.file) != 0) { clear_tfile_check_list(); goto error_tgt_fput; } } else list_add(&tf.file->f_tfile_llink, &tfile_check_list); mutex_lock_nested(&ep->mtx, 0); if (is_file_epoll(tf.file)) { tep = tf.file->private_data; mutex_lock_nested(&tep->mtx, 1); } } } /* * Try to lookup the file inside our RB tree, Since we grabbed "mtx" * above, we can be sure to be able to use the item looked up by * ep_find() till we release the mutex. */ /** 对于每一个监听的fd,内核都有分配一个epitem结构,并且不允许重复分配,所以要查找该fd是否 已经存在. ep_find()即在红黑树中查找,时间复杂度为O(lgN). **/ epi = ep_find(ep, tf.file, fd); error = -EINVAL; switch (op) { /** 首先关心添加 **/ case EPOLL_CTL_ADD: if (!epi) { /** 如果ep_find()没有找到相关的epitem,证明是第一次插入. 在此可以看到,内核总会关心POLLERR和POLLHUP. **/ epds.events |= POLLERR | POLLHUP; /** 红黑树插入,ep_insert()的具体分析在下面 **/ error = ep_insert(ep, &epds, tf.file, fd, full_check); } else /** 如果找到了,则是重复添加 **/ error = -EEXIST; if (full_check) // TODO clear_tfile_check_list(); break; case EPOLL_CTL_DEL: /** 删除 **/ if (epi) error = ep_remove(ep, epi); else error = -ENOENT; break; case EPOLL_CTL_MOD: /** 修改 **/ if (epi) { epds.events |= POLLERR | POLLHUP; error = ep_modify(ep, epi, &epds); } else error = -ENOENT; break; } if (tep != NULL) mutex_unlock(&tep->mtx); mutex_unlock(&ep->mtx); // 解锁 error_tgt_fput: if (full_check) mutex_unlock(&epmutex); fdput(tf); error_fput: fdput(f); error_return: return error; } /* * Must be called with "mtx" held. */ /** ep_insert()在epoll_ctl()中被调用,其工作是往epollfd的红黑树中添加一个待监听fd. **/ static int ep_insert(struct eventpoll *ep, struct epoll_event *event, struct file *tfile, int fd, int full_check) { int error, revents, pwake = 0; unsigned long flags; long user_watches; struct epitem *epi; struct ep_pqueue epq; /** struct ep_pqueue的定义如下: @ fs/eventpoll.c // Wrapper struct used by poll queueing struct ep_pqueue { poll_table pt; struct epitem *epi; }; **/ /** 查看是否达到当前用户的最大监听数 **/ user_watches = atomic_long_read(&ep->user->epoll_watches); if (unlikely(user_watches >= max_user_watches)) return -ENOSPC; /** 从slab中分配一个epitem **/ if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL))) return -ENOMEM; /* Item initialization follow here ... */ /** 相关数据成员的初始化 **/ INIT_LIST_HEAD(&epi->rdllink); INIT_LIST_HEAD(&epi->fllink); INIT_LIST_HEAD(&epi->pwqlist); epi->ep = ep; /** 在该epitem中保存待监听的fd和它的file结构. **/ ep_set_ffd(&epi->ffd, tfile, fd); epi->event = *event; epi->nwait = 0; epi->next = EP_UNACTIVE_PTR; if (epi->event.events & EPOLLWAKEUP) { error = ep_create_wakeup_source(epi); if (error) goto error_create_wakeup_source; } else { RCU_INIT_POINTER(epi->ws, NULL); } /* Initialize the poll table using the queue callback */ epq.epi = epi; /** 初始化一个poll_table, 其实质是指定调用poll_wait()时(不是epoll_wait)的回调函数,以及我们关心哪些event. ep_ptable_queue_proc()就是我们的回调函数,初值是所有event都关心. ep_ptable_queue_proc()的具体分析在下面. **/ init_poll_funcptr(&epq.pt, ep_ptable_queue_proc); /* * Attach the item to the poll hooks and get current event bits. * We can safely use the file* here because its usage count has * been increased by the caller of this function. Note that after * this operation completes, the poll callback can start hitting * the new item. */ revents = ep_item_poll(epi, &epq.pt); /** ep_item_poll()的定义如下: @ fs/eventpoll.c static inline unsigned int ep_item_poll(struct epitem *epi, poll_table *pt) { pt->_key = epi->event.events; return epi->ffd.file->f_op->poll(epi->ffd.file, pt) & epi->event.events; } **/ /** f_op->poll()一般来说只是个wrapper,它会调用真正的poll实现. 拿UDP的socket来举例,调用流程如下: f_op->poll(),sock_poll(),udp_poll(),datagram_poll(),sock_poll_wait(), 最后调用到上面指定的ep_ptable_queue_proc(). 完成这一步,该epitem就跟这个socket关联起来了,当后者有状态变化时,会通过ep_poll_callback() 来通知. 所以,f_op->poll()做了两件事情: 1.将该epitem和这个待监听的fd关联起来; 2.查询这个待监听的fd是否已经有event已经ready了,有的话就将event返回. **/ /* * We have to check if something went wrong during the poll wait queue * install process. Namely an allocation for a wait queue failed due * high memory pressure. */ error = -ENOMEM; if (epi->nwait < 0) goto error_unregister; /* Add the current item to the list of active epoll hook for this file */ /** 把每个文件和对应的epitem关联起来 **/ spin_lock(&tfile->f_lock); list_add_tail_rcu(&epi->fllink, &tfile->f_ep_links); spin_unlock(&tfile->f_lock); /* * Add the current item to the RB tree. All RB tree operations are * protected by "mtx", and ep_insert() is called with "mtx" held. */ /** 将epitem插入到eventpoll的红黑树中 **/ ep_rbtree_insert(ep, epi); /* now check if we've created too many backpaths */ error = -EINVAL; if (full_check && reverse_path_check()) goto error_remove_epi; /* We have to drop the new item inside our item list to keep track of it */ spin_lock_irqsave(&ep->lock, flags); // TODO /* If the file is already "ready" we drop it inside the ready list */ /** 在这里,如果待监听的fd已经有事件发生,就去处理一下 **/ if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) { /** 将当前的epitem加入到ready list中去 **/ list_add_tail(&epi->rdllink, &ep->rdllist); ep_pm_stay_awake(epi); /* Notify waiting tasks that events are available */ /** 哪个进程在调用epoll_wait(),就唤醒它 **/ if (waitqueue_active(&ep->wq)) wake_up_locked(&ep->wq); /** 先不通知对eventpoll进行poll的进程 **/ if (waitqueue_active(&ep->poll_wait)) pwake++; } spin_unlock_irqrestore(&ep->lock, flags); atomic_long_inc(&ep->user->epoll_watches); /* We have to call this outside the lock */ if (pwake) /** 安全地通知对eventpoll进行poll的进程 **/ ep_poll_safewake(&ep->poll_wait); return 0; error_remove_epi: spin_lock(&tfile->f_lock); list_del_rcu(&epi->fllink); spin_unlock(&tfile->f_lock); rb_erase(&epi->rbn, &ep->rbr); error_unregister: ep_unregister_pollwait(ep, epi); /* * We need to do this because an event could have been arrived on some * allocated wait queue. Note that we don't care about the ep->ovflist * list, since that is used/cleaned only inside a section bound by "mtx". * And ep_insert() is called with "mtx" held. */ spin_lock_irqsave(&ep->lock, flags); if (ep_is_linked(&epi->rdllink)) list_del_init(&epi->rdllink); spin_unlock_irqrestore(&ep->lock, flags); wakeup_source_unregister(ep_wakeup_source(epi)); error_create_wakeup_source: kmem_cache_free(epi_cache, epi); return error; } /* * This is the callback that is used to add our wait queue to the * target file wakeup lists. */ /** 该函数在调用f_op->poll()时被调用. 其作用是当epoll主动poll某个待监听fd时,将epitem和该fd关联起来. 关联的方法是使用等待队列. **/ static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, poll_table *pt) { struct epitem *epi = ep_item_from_epqueue(pt); struct eppoll_entry *pwq; /** @ fs/eventpoll.c // Wait structure used by the poll hooks struct eppoll_entry { // List header used to link this structure to the "struct epitem" struct list_head llink; // The "base" pointer is set to the container "struct epitem" struct epitem *base; // Wait queue item that will be linked to the target file wait // queue head. wait_queue_t wait; // The wait queue head that linked the "wait" wait queue item wait_queue_head_t *whead; }; **/ if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) { /** 初始化等待队列,指定ep_poll_callback()为唤醒时的回调函数. 当监听的fd发生状态改变时,即队列头被唤醒时,指定的回调函数会被调用. **/ init_waitqueue_func_entry(&pwq->wait, ep_poll_callback); // ep_poll_callback()的具体分析在下面 pwq->whead = whead; pwq->base = epi; add_wait_queue(whead, &pwq->wait); list_add_tail(&pwq->llink, &epi->pwqlist); epi->nwait++; } else { /* We have to signal that an error occurred */ epi->nwait = -1; } } /* * This is the callback that is passed to the wait queue wakeup * mechanism. It is called by the stored file descriptors when they * have events to report. */ /** 这是一个关键的回调函数. 当被监听的fd发生状态改变时,该函数会被调用. 参数key指向events. **/ static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key) { int pwake = 0; unsigned long flags; struct epitem *epi = ep_item_from_wait(wait); // 从等待队列获取epitem struct eventpoll *ep = epi->ep; spin_lock_irqsave(&ep->lock, flags); /* * If the event mask does not contain any poll(2) event, we consider the * descriptor to be disabled. This condition is likely the effect of the * EPOLLONESHOT bit that disables the descriptor when an event is received, * until the next EPOLL_CTL_MOD will be issued. */ if (!(epi->event.events & ~EP_PRIVATE_BITS)) goto out_unlock; /* * Check the events coming with the callback. At this stage, not * every device reports the events in the "key" parameter of the * callback. We need to be able to handle both cases here, hence the * test for "key" != NULL before the event match test. */ /** 没有我们关心的event **/ if (key && !((unsigned long) key & epi->event.events)) goto out_unlock; /* * If we are transferring events to userspace, we can hold no locks * (because we're accessing user memory, and because of linux f_op->poll() * semantics). All the events that happen during that period of time are * chained in ep->ovflist and requeued later on. */ /** 如果该函数被调用时,epoll_wait()已经返回了, 即此时应用程序已经在循环中获取events了, 这种情况下,内核将此刻发生状态改变的epitem用一个单独的链表保存起来,并且在下一次epoll_wait() 时返回给用户.这个单独的链表就是ovflist. */ if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) { if (epi->next == EP_UNACTIVE_PTR) { epi->next = ep->ovflist; ep->ovflist = epi; if (epi->ws) { /* * Activate ep->ws since epi->ws may get * deactivated at any time. */ __pm_stay_awake(ep->ws); } } goto out_unlock; } /* If this file is already in the ready list we exit soon */ /** 将当前epitem添加到ready list中 **/ if (!ep_is_linked(&epi->rdllink)) { list_add_tail(&epi->rdllink, &ep->rdllist); ep_pm_stay_awake_rcu(epi); } /* * Wake up ( if active ) both the eventpoll wait list and the ->poll() * wait list. */ /** 唤醒调用epoll_wait()的进程 **/ if (waitqueue_active(&ep->wq)) wake_up_locked(&ep->wq); /** 先不通知对eventpoll进行poll的进程 **/ if (waitqueue_active(&ep->poll_wait)) pwake++; out_unlock: spin_unlock_irqrestore(&ep->lock, flags); /* We have to call this outside the lock */ if (pwake) /** 安全地通知对eventpoll进行poll的进程 **/ ep_poll_safewake(&ep->poll_wait); if ((unsigned long)key & POLLFREE) { /* * If we race with ep_remove_wait_queue() it can miss * ->whead = NULL and do another remove_wait_queue() after * us, so we can't use __remove_wait_queue(). */ list_del_init(&wait->task_list); /* * ->whead != NULL protects us from the race with ep_free() * or ep_remove(), ep_remove_wait_queue() takes whead->lock * held by the caller. Once we nullify it, nothing protects * ep/epi or even wait. */ smp_store_release(&ep_pwq_from_wait(wait)->whead, NULL); } return 1; } /* * Implement the event wait interface for the eventpoll file. It is the kernel * part of the user space epoll_wait(2). */ SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events, int, maxevents, int, timeout) { int error; struct fd f; struct eventpoll *ep; /* The maximum number of event must be greater than zero */ if (maxevents <= 0 || maxevents > EP_MAX_EVENTS) return -EINVAL; /* Verify that the area passed by the user is writeable */ /** 内核要验证这一段用户空间的内存是不是有效的,可写的. **/ if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))) return -EFAULT; /* Get the "struct file *" for the eventpoll file */ /** 获取epollfd的file结构 **/ f = fdget(epfd); if (!f.file) return -EBADF; /* * We have to check that the file structure underneath the fd * the user passed to us _is_ an eventpoll file. */ error = -EINVAL; /** 检查它是不是一个真正的epollfd **/ if (!is_file_epoll(f.file)) goto error_fput; /* * At this point it is safe to assume that the "private_data" contains * our own data structure. */ /** 获取eventpoll结构 **/ ep = f.file->private_data; /* Time to fish for events ... */ /** 睡眠,等待事件到来. ep_poll()的具体分析在下面. **/ error = ep_poll(ep, events, maxevents, timeout); error_fput: fdput(f); return error; } /** * ep_poll - Retrieves ready events, and delivers them to the caller supplied * event buffer. * * @ep: Pointer to the eventpoll context. * @events: Pointer to the userspace buffer where the ready events should be * stored. * @maxevents: Size (in terms of number of events) of the caller event buffer. * @timeout: Maximum timeout for the ready events fetch operation, in * milliseconds. If the @timeout is zero, the function will not block, * while if the @timeout is less than zero, the function will block * until at least one event has been retrieved (or an error * occurred). * * Returns: Returns the number of ready events which have been fetched, or an * error code, in case of error. */ /** 执行epoll_wait()的进程在该函数进入休眠状态. **/ static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout) { int res = 0, eavail, timed_out = 0; unsigned long flags; long slack = 0; wait_queue_t wait; ktime_t expires, *to = NULL; if (timeout > 0) { /** 计算睡眠时间 **/ struct timespec end_time = ep_set_mstimeout(timeout); slack = select_estimate_accuracy(&end_time); to = &expires; *to = timespec_to_ktime(end_time); } else if (timeout == 0) { /** 已经超时,直接检查ready list **/ /* * Avoid the unnecessary trip to the wait queue loop, if the * caller specified a non blocking operation. */ timed_out = 1; spin_lock_irqsave(&ep->lock, flags); goto check_events; } fetch_events: spin_lock_irqsave(&ep->lock, flags); /** 没有可用的事件,即ready list和ovflist均为空. **/ if (!ep_events_available(ep)) { /* * We don't have any available event to return to the caller. * We need to sleep here, and we will be wake up by * ep_poll_callback() when events will become available. */ /** 初始化一个等待队列成员,current是当前进程. 然后把该等待队列成员添加到ep的等待队列中,即当前进程把自己添加到等待队列中. **/ init_waitqueue_entry(&wait, current); __add_wait_queue_exclusive(&ep->wq, &wait); for (;;) { /* * We don't want to sleep if the ep_poll_callback() sends us * a wakeup in between. That's why we set the task state * to TASK_INTERRUPTIBLE before doing the checks. */ /** 将当前进程的状态设置为睡眠时可以被信号唤醒. 仅仅是状态设置,还没有睡眠. **/ set_current_state(TASK_INTERRUPTIBLE); /** 如果此时,ready list已经有成员了,或者已经超时,则不进入睡眠. **/ if (ep_events_available(ep) || timed_out) break; /** 如果有信号产生,不进入睡眠. **/ if (signal_pending(current)) { res = -EINTR; break; } spin_unlock_irqrestore(&ep->lock, flags); /** 挂起当前进程,等待被唤醒或超时 **/ if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS)) timed_out = 1; spin_lock_irqsave(&ep->lock, flags); } __remove_wait_queue(&ep->wq, &wait); // 把当前进程从该epollfd的等待队列中删除. __set_current_state(TASK_RUNNING); // 将当前进程的状态设置为可运行. } check_events: /* Is it worth to try to dig for events ? */ eavail = ep_events_available(ep); spin_unlock_irqrestore(&ep->lock, flags); /* * Try to transfer events to user space. In case we get 0 events and * there's still timeout left over, we go trying again in search of * more luck. */ /** 如果一切正常,并且有event发生,则拷贝数据给用户空间 **/ // ep_send_events()的具体分析在下面 if (!res && eavail && !(res = ep_send_events(ep, events, maxevents)) && !timed_out) goto fetch_events; return res; } static int ep_send_events(struct eventpoll *ep, struct epoll_event __user *events, int maxevents) { struct ep_send_events_data esed; /** @ fs/eventpoll.c // Used by the ep_send_events() function as callback private data struct ep_send_events_data { int maxevents; struct epoll_event __user *events; }; **/ esed.maxevents = maxevents; esed.events = events; // ep_scan_ready_list()的具体分析在下面 return ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false); } /** * ep_scan_ready_list - Scans the ready list in a way that makes possible for * the scan code, to call f_op->poll(). Also allows for * O(NumReady) performance. * * @ep: Pointer to the epoll private data structure. * @sproc: Pointer to the scan callback. * @priv: Private opaque data passed to the @sproc callback. * @depth: The current depth of recursive f_op->poll calls. * @ep_locked: caller already holds ep->mtx * * Returns: The same integer error code returned by the @sproc callback. */ static int ep_scan_ready_list(struct eventpoll *ep, int (*sproc)(struct eventpoll *, struct list_head *, void *), void *priv, int depth, bool ep_locked) { int error, pwake = 0; unsigned long flags; struct epitem *epi, *nepi; LIST_HEAD(txlist); /* * We need to lock this because we could be hit by * eventpoll_release_file() and epoll_ctl(). */ if (!ep_locked) mutex_lock_nested(&ep->mtx, depth); /* * Steal the ready list, and re-init the original one to the * empty list. Also, set ep->ovflist to NULL so that events * happening while looping w/out locks, are not lost. We cannot * have the poll callback to queue directly on ep->rdllist, * because we want the "sproc" callback to be able to do it * in a lockless way. */ spin_lock_irqsave(&ep->lock, flags); /** 将ready list上的epitem(即监听事件发生状态改变的epitem)移动到txlist, 并且将ready list清空. **/ list_splice_init(&ep->rdllist, &txlist); /** 改变ovflist的值. 在上面的ep_poll_callback()中可以看到,如果ovflist != EP_UNACTIVE_PTR,当等待队列成员被激活时, 就会将对应的epitem加入到ep->ovflist中,否则加入到ep->rdllist中. 所以这里是为了防止把新来的发生状态改变的epitem加入到ready list中. **/ ep->ovflist = NULL; spin_unlock_irqrestore(&ep->lock, flags); /* * Now call the callback function. */ /** 调用扫描函数处理txlist. 该扫描函数就是ep_send_events_proc.具体分析在下面. **/ error = (*sproc)(ep, &txlist, priv); spin_lock_irqsave(&ep->lock, flags); /* * During the time we spent inside the "sproc" callback, some * other events might have been queued by the poll callback. * We re-insert them inside the main ready-list here. */ /** 在调用sproc()期间,可能会有新的事件发生(被添加到ovflist中),遍历这些发生新事件的epitem, 将它们插入到ready list中. **/ for (nepi = ep->ovflist; (epi = nepi) != NULL; nepi = epi->next, epi->next = EP_UNACTIVE_PTR) { /** @ fs/eventpoll.c #define EP_UNACTIVE_PTR ((void *) -1L) **/ /* * We need to check if the item is already in the list. * During the "sproc" callback execution time, items are * queued into ->ovflist but the "txlist" might already * contain them, and the list_splice() below takes care of them. */ /** epitem不在ready list?插入! **/ if (!ep_is_linked(&epi->rdllink)) { list_add_tail(&epi->rdllink, &ep->rdllist); ep_pm_stay_awake(epi); } } /* * We need to set back ep->ovflist to EP_UNACTIVE_PTR, so that after * releasing the lock, events will be queued in the normal way inside * ep->rdllist. */ /** 还原ovflist的状态 **/ ep->ovflist = EP_UNACTIVE_PTR; /* * Quickly re-inject items left on "txlist". */ /** 将上次没有处理完的epitem,重新插入到ready list中. **/ list_splice(&txlist, &ep->rdllist); __pm_relax(ep->ws); /** 如果ready list不为空,唤醒. **/ if (!list_empty(&ep->rdllist)) { /* * Wake up (if active) both the eventpoll wait list and * the ->poll() wait list (delayed after we release the lock). */ if (waitqueue_active(&ep->wq)) wake_up_locked(&ep->wq); if (waitqueue_active(&ep->poll_wait)) pwake++; } spin_unlock_irqrestore(&ep->lock, flags); if (!ep_locked) mutex_unlock(&ep->mtx); /* We have to call this outside the lock */ if (pwake) ep_poll_safewake(&ep->poll_wait); return error; } /** 该函数作为callback在ep_scan_ready_list()中被调用. head是一个链表头,链接着已经ready了的epitem. 这个链表不是eventpoll的ready list,而是上面函数中的txlist. **/ static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head, void *priv) { struct ep_send_events_data *esed = priv; int eventcnt; unsigned int revents; struct epitem *epi; struct epoll_event __user *uevent; struct wakeup_source *ws; poll_table pt; init_poll_funcptr(&pt, NULL); /* * We can loop without lock because we are passed a task private list. * Items cannot vanish during the loop because ep_scan_ready_list() is * holding "mtx" during this call. */ /** 遍历整个链表 **/ for (eventcnt = 0, uevent = esed->events; !list_empty(head) && eventcnt < esed->maxevents;) { /** 取出第一个结点 **/ epi = list_first_entry(head, struct epitem, rdllink); /* * Activate ep->ws before deactivating epi->ws to prevent * triggering auto-suspend here (in case we reactive epi->ws * below). * * This could be rearranged to delay the deactivation of epi->ws * instead, but then epi->ws would temporarily be out of sync * with ep_is_linked(). */ // TODO ws = ep_wakeup_source(epi); if (ws) { if (ws->active) __pm_stay_awake(ep->ws); __pm_relax(ws); } /** 从ready list中删除该结点 **/ list_del_init(&epi->rdllink); /** 获取ready事件掩码 **/ revents = ep_item_poll(epi, &pt); /** ep_item_poll()的具体分析在上面的ep_insert()中. **/ /* * If the event mask intersect the caller-requested one, * deliver the event to userspace. Again, ep_scan_ready_list() * is holding "mtx", so no operations coming from userspace * can change the item. */ if (revents) { /** 将ready事件和用户传入的数据都拷贝到用户空间 **/ if (__put_user(revents, &uevent->events) || __put_user(epi->event.data, &uevent->data)) { list_add(&epi->rdllink, head); ep_pm_stay_awake(epi); return eventcnt ? eventcnt : -EFAULT; } eventcnt++; uevent++; if (epi->event.events & EPOLLONESHOT) epi->event.events &= EP_PRIVATE_BITS; else if (!(epi->event.events & EPOLLET)) { /** 边缘触发(ET)和水平触发(LT)的区别: 如果是ET,就绪epitem不会再次被加入到ready list中,除非fd再次发生状态改变,ep_poll_callback被调用. 如果是LT,不论是否还有有效的事件和数据,epitem都会被再次加入到ready list中,在下次epoll_wait()时会 立即返回,并通知用户空间.当然如果这个被监听的fd确实没有事件和数据,epoll_wait()会返回一个0. **/ /* * If this file has been added with Level * Trigger mode, we need to insert back inside * the ready list, so that the next call to * epoll_wait() will check again the events * availability. At this point, no one can insert * into ep->rdllist besides us. The epoll_ctl() * callers are locked out by * ep_scan_ready_list() holding "mtx" and the * poll callback will queue them in ep->ovflist. */ list_add_tail(&epi->rdllink, &ep->rdllist); ep_pm_stay_awake(epi); } } } return eventcnt; } /** 该函数在epollfd被close时调用,其工作是释放一些资源. **/ static void ep_free(struct eventpoll *ep) { struct rb_node *rbp; struct epitem *epi; /* We need to release all tasks waiting for these file */ if (waitqueue_active(&ep->poll_wait)) ep_poll_safewake(&ep->poll_wait); /* * We need to lock this because we could be hit by * eventpoll_release_file() while we're freeing the "struct eventpoll". * We do not need to hold "ep->mtx" here because the epoll file * is on the way to be removed and no one has references to it * anymore. The only hit might come from eventpoll_release_file() but * holding "epmutex" is sufficient here. */ mutex_lock(&epmutex); /* * Walks through the whole tree by unregistering poll callbacks. */ for (rbp = rb_first(&ep->rbr); rbp; rbp = rb_next(rbp)) { epi = rb_entry(rbp, struct epitem, rbn); ep_unregister_pollwait(ep, epi); cond_resched(); } /* * Walks through the whole tree by freeing each "struct epitem". At this * point we are sure no poll callbacks will be lingering around, and also by * holding "epmutex" we can be sure that no file cleanup code will hit * us during this operation. So we can avoid the lock on "ep->lock". * We do not need to lock ep->mtx, either, we only do it to prevent * a lockdep warning. */ mutex_lock(&ep->mtx); /** 在epoll_ctl()中被添加的监听fd,在这里被关闭. **/ while ((rbp = rb_first(&ep->rbr)) != NULL) { epi = rb_entry(rbp, struct epitem, rbn); ep_remove(ep, epi); cond_resched(); } mutex_unlock(&ep->mtx); mutex_unlock(&epmutex); mutex_destroy(&ep->mtx); free_uid(ep->user); wakeup_source_unregister(ep->ws); kfree(ep); }
2.实现过程
一. epoll_create
- 调用ep_alloc()来创建一个struct eventpoll对象.ep_alloc()的执行过程如下:
1a. 获取当前用户的一些信息.
1b. 分配一个struct eventpoll对象.
1c. 初始化相关数据成员,如等待队列,就绪链表,红黑树. - 创建一个匿名fd和与之对应的struct file对象.
- 将该eventpoll和该file关联起来,eventpoll对象保存在file对象的private_data指针中.
二. epoll_ctl
- 将event拷贝到内核空间.
- 判断加入的fd是否支持poll操作.
- 根据用户传入的op参数,以及是否在eventpoll的红黑树中找到该fd的结点,来执行相应的操作(插入,删除,修改).拿插入举例,执行ep_insert():
3a. 在slab缓存中分配一个epitem对象,并初始化相关数据成员,如保存待监听的fd和它的file结构.
3b. 指定调用poll_wait()时(再次强调,不是epoll_wait)时的回调函数,用于数据就绪时唤醒进程.(其实质是初始化文件的等待队列,将进程加入到等待队列).
3c. 到此该epitem就和这个待监听的fd关联起来了.
3d. 将该epitem插入到eventpoll的红黑树中.
三. epoll_wait
- 调用ep_poll():
1a. 计算睡眠时间(如果有).
1b. 判断eventpoll的就绪链表是否为空,不为空则直接处理而不是睡眠.
1c. 将当前进程添加到eventpoll的等待队列中.
1d. 进入循环.
1e. 将当前进程设置成TASK_INTERRUPTIBLE状态,然后判断是否有信号到来,如果没有,则进入睡眠.
1f. 如果超时或被信号唤醒,则跳出循环.
1g. 将当前进程从等待队列中删除,并把其状态设置成TASK_RUNNING.
1h. 将数据拷贝给用户空间.拷贝的过程是先把ready list转移到中间链表,然后遍历中间链表拷贝到用户空间,并且判断每个结点是否水平触发,是则再次插入
到ready list.
3.小结
从上述分析可见,相对于select/poll,epoll的高效体现在:
- fd只拷贝一次,即调用epoll_ctl()时把fd拷贝到内核空间.
- epoll轮询的是就绪链表,而不是所有fd.
最后
本以为两天时间就可以分析完这些源码并完成这篇博文,谁知道花了整整一个星期.不过收获匪浅.
在此留下两个问题:
- 因为目前我的知识面有限,上述源码留下一些TODO,日后会一一完善.
- 话说epoll有什么缺点呢?