暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

一起撸linux内核18-io多路复用之epoll (03)

囧囧妹 2022-03-04
179

点击上方蓝字【囧囧妹】一起学习,一起成长!

一、开篇

接上一节一起撸linux内核17-io多路复用之epoll (02),我们还有一个最关键的一个函数epoll_wait。
epoll内容比较多,代码也比较多,保持耐心,痛过之后就会涅槃!

示例代码我都放在了https://gitee.com/sunnyshare/linux-examplecode.git


二、epoll_wait

该函数简单些,这里就不把其实现代码放这了,主要是进行一些参数判断,根据epfd取出文件结构然后调用ep_poll,这里来看核心实现ep_poll
    static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
    int maxevents, long timeout)
    {
    int res = 0, eavail, timed_out = 0;
    unsigned long flags;
    long slack = 0;
    wait_queue_t wait;
    ktime_t expires, *to = NULL;


    如果timeout大于0就设置超时时间
    if (timeout > 0) {
    struct timespec end_time = ep_set_mstimeout(timeout);


    slack = select_estimate_accuracy(&end_time);
    to = &expires;
    *to = timespec_to_ktime(end_time);
    } else if (timeout == 0) {
    timeout==0也就是不用等待,此时会跳过中间过程直接执行check_events
    timed_out = 1;
    spin_lock_irqsave(&ep->lock, flags);
    goto check_events;
    }
    fetch_events:
    spin_lock_irqsave(&ep->lock, flags);
    检测是否rdlist是否为空
    if (!ep_events_available(ep)) {
    将当前进程的默认唤醒回调添加到wait队列上
    init_waitqueue_entry(&wait, current);
    将wait队列wq队列上
    __add_wait_queue_exclusive(&ep->wq, &wait);


    for (;;) {
    //当前任务设置为可中断
    set_current_state(TASK_INTERRUPTIBLE);
    //就绪链表不为空或者超时则跳出
    if (ep_events_available(ep) || timed_out)
    break;
    //收到未决信号跳出
    if (signal_pending(current)) {
    res = -EINTR;
    break;
    }


    spin_unlock_irqrestore(&ep->lock, flags);
    //让出cpu,进入休眠
    if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
    timed_out = 1;


    spin_lock_irqsave(&ep->lock, flags);
    }
    //将wait从wq队列删除掉
    __remove_wait_queue(&ep->wq, &wait);
    //当前任务设置为运行态
    __set_current_state(TASK_RUNNING);
    }
    check_events:
    //如果超时为0则直接跳转到此处进行就绪链表是否为空判断
    //如果超时>0则从上面for(;;)中跳出到此处
    eavail = ep_events_available(ep);
    spin_unlock_irqrestore(&ep->lock, flags);
    //res==0且有事件就绪&&ep_send_events返回就绪数量不为0&&未超时
    if (!res && eavail &&
    !(res = ep_send_events(ep, events, maxevents)) && !timed_out)
    goto fetch_events;//上述条件有一个不成立则继续执行fetch_events


    return res;//返回就绪的事件数量
    }
    通过注释我们看出其实就是循环判断rdlist就绪链表是否为空,如果不为空则跳出,将就绪链表中的事件拷贝给用户空间。
    那么,我们应该知道了几个点,第一从用户空间过来的监听fd和事件我们进行了一次拷贝,之后就不再拷贝了,第二只有就绪链表不为空则会发生向用户空间的数据拷贝。我们来看看ep_send_events。
      static int ep_send_events(struct eventpoll *ep,
      struct epoll_event __user *events, int maxevents)
      {
      struct ep_send_events_data esed;
      esed.maxevents = maxevents;
      esed.events = events;
      return ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);
      }

      该函数的实现比较简单,设置一些参数然后执行ep_scan_ready_list,这里注意传参回调函数ep_send_events_proc,下面先来分析ep_scan_ready_list。
        //扫描就绪链表
        static int ep_scan_ready_list(struct eventpoll *ep,
        int (*sproc)(struct eventpoll *,
        struct list_head *, void *),
        void *priv, int depth, bool ep_locked)
        {
        int error, pwake = 0;
        unsigned long flags;
        struct epitem *epi, *nepi;
        //初始化链表
        LIST_HEAD(txlist);
        if (!ep_locked)
        mutex_lock_nested(&ep->mtx, depth);
        spin_lock_irqsave(&ep->lock, flags);
        //将就绪链表连接到txlist
        list_splice_init(&ep->rdllist, &txlist);
        ep->ovflist = NULL;
        spin_unlock_irqrestore(&ep->lock, flags);
        error = (*sproc)(ep, &txlist, priv);
        spin_lock_irqsave(&ep->lock, flags);
        //当执行sproc回调时,可能有些事件重新进入了poll回调,这里要重新将他们插入就绪链表
        for (nepi = ep->ovflist; (epi = nepi) != NULL;
        nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
        //查看该epitem是否已经在就绪链表
        if (!ep_is_linked(&epi->rdllink)) {
        list_add_tail(&epi->rdllink, &ep->rdllist);
        ep_pm_stay_awake(epi);
        }
        }
        ep->ovflist = EP_UNACTIVE_PTR;
        //将未拷贝到用户空间的就绪链表重新连接到ep就绪链表
        list_splice(&txlist, &ep->rdllist);
        __pm_relax(ep->ws);


        //就绪链表不为空则再次激活epoll_wait
        //为什么会不为空呢?
        //当进行就绪链表拷贝到用户空间的时,极有可能发生事件的触发再次添加到就绪链表
        //当拷贝过程中出现错误或者没有全部拷贝到用户空间
        if (!list_empty(&ep->rdllist)) {
        if (waitqueue_active(&ep->wq))
        wake_up_locked(&ep->wq);
        if (waitqueue_active(&ep->poll_wait))
        pwake++;
        }
        spin_unlock_irqrestore(&ep->lock, flags);
        if (!ep_locked)
        mutex_unlock(&ep->mtx);
        if (pwake)
        ep_poll_safewake(&ep->poll_wait);
        return error;
        }
        这里我们需要注意在进行就绪事件链表拷贝到用户空间后需要再次进行就绪链表是否为空的判断,为什么呢?
        1,当进行就绪链表拷贝到用户空间的时,极有可能发生事件的触发再次添加到就绪链表。
        2,当拷贝过程中出现错误或者没有全部拷贝到用户空间。
        所以需要再次判断就绪链表,然后唤醒资源以及epoll_wait。
         
        我们再来分析那个拷贝的回调函数ep_send_events_proc
          //将就绪事件和数据拷贝给用户空间
          static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
          void *priv)
          {
          struct ep_send_events_data *esed = priv;
          int eventcnt;
          unsigned int revents;
          struct epitem *epi;
          struct epoll_event __user *uevent;
          struct wakeup_source *ws;
          poll_table pt;
          init_poll_funcptr(&pt, NULL);
          //遍历就绪链表eventcnt记录就绪的事件fd数量
          for (eventcnt = 0, uevent = esed->events;
          !list_empty(head) && eventcnt < esed->maxevents;) {
          epi = list_first_entry(head, struct epitem, rdllink);
          ws = ep_wakeup_source(epi);
          if (ws) {
          if (ws->active)
          __pm_stay_awake(ep->ws);
          __pm_relax(ws);
          }
          //将epi从链表中删除
          list_del_init(&epi->rdllink);


          //获取最新的事件数据
          revents = ep_item_poll(epi, &pt);
          //如果有事件发生将events拷贝到用户空间
          if (revents) {
          if (__put_user(revents, &uevent->events) ||
          __put_user(epi->event.data, &uevent->data)) {
          //拷贝发生错误则将epi重新挂载到就绪链表
          list_add(&epi->rdllink, head);
          ep_pm_stay_awake(epi);
          return eventcnt ? eventcnt : -EFAULT;
          }
          eventcnt++;
          uevent++;
          if (epi->event.events & EPOLLONESHOT)
          epi->event.events &= EP_PRIVATE_BITS;
          else if (!(epi->event.events & EPOLLET)) {
          //如果是ET模式,epitem不会再次添加到就绪链表,除非事件再次发生
          //如果是LT模式,则会将epitem重新添加到就绪链表,并唤醒对应的资源,
          //以此来达到下次再次epoll_wait时会将上次未处理事件再次返回
          list_add_tail(&epi->rdllink, &ep->rdllist);
          ep_pm_stay_awake(epi);
          }
          }
          }
          //返回的是就绪事件数量
          return eventcnt;
          }

          该函数需要注意ET和LT模式的区别,如果为LT会将epitem重新放回就绪链表。
           
          好了,我们总结一下epoll,关于epoll内容比较多,需要些耐心一点点的来看,里面也涉及了很多的数据结构,后面我会用一张图来展示这些数据结构的关联。通过epoll我们知道了epoll是不受限于1024个文件描述符的,且在epoll_wait返回后无需再次将监听fd添加到内核,在返回就绪事件时epoll是通过拷贝就绪链表来完成的,这样返回到用户态的数据只有就绪事件,在性能上有一定提升,在事件触发上其实是有一个惊群问题,如何解决呢?可以通过ET模式来解决epoll_wait的惊群,我们后面会在应用层编程中来体现惊群的解决方案。那么selectpollepoll有什么区别呢?在高并发高性能情况下如何选择呢?下一节我们来做个总结,同时将epoll涉及的数据结构关联关系通过一张图来展示一下。


          觉得不错,点击“分享”,“赞”,“在看”传播给更多热爱嵌入式的小伙伴吧!
          文章转载自囧囧妹,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

          评论