C语言 如何在epoll上使用具有级别触发行为的eventfd?

oiopk7p5  于 2023-03-22  发布在  其他
关注(0)|答案(2)|浏览(180)

epoll_ctl上注册一个级别触发的eventfd只会触发一次,当不递减eventfd计数器时。为了总结这个问题,我观察到epoll标志(EPOLLETEPOLLONESHOTNone用于级别触发行为)的行为类似。或者换句话说:没有效果。
你能确认这个bug吗?
我有一个多线程的应用程序。每个线程都在等待epoll_wait的新事件,这些事件具有相同的epollfd。如果你想优雅地终止应用程序,所有线程都必须被唤醒。我的想法是使用eventfd计数器(EFD_SEMAPHORE|EFD_NONBLOCK)为此(用级别触发的epoll行为)一起醒来。(不管少数filedescriptor的雷鸣般的羊群问题。)
例如,对于4个线程,您向eventfd写入4。我希望epoll_wait立即返回,并一次又一次地返回,直到计数器递减(读取)4次。epoll_wait每次写入只返回一次。
是的,我仔细阅读了所有相关手册;)

#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>

static int event_fd = -1;
static int epoll_fd = -1;

void *thread(void *arg)
{
    (void) arg;

    for(;;) {
       struct epoll_event event;
       epoll_wait(epoll_fd, &event, 1, -1);

       /* handle events */
       if(event.data.fd == event_fd && event.events & EPOLLIN) {
           uint64_t val = 0;
           eventfd_read(event_fd, &val);
           break;
       }
    }

    return NULL;
}

int main(void)
{
    epoll_fd = epoll_create1(0);
    event_fd = eventfd(0, EFD_SEMAPHORE| EFD_NONBLOCK);

    struct epoll_event event;
    event.events = EPOLLIN;
    event.data.fd = event_fd;
    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, event_fd, &event);

    enum { THREADS = 4 };
    pthread_t thrd[THREADS];

    for (int i = 0; i < THREADS; i++)
        pthread_create(&thrd[i], NULL, &thread, NULL);

    /* let threads park internally (kernel does readiness check before sleeping) */
    usleep(100000);
    eventfd_write(event_fd, THREADS);

    for (int i = 0; i < THREADS; i++)
        pthread_join(thrd[i], NULL);
}
gxwragnw

gxwragnw1#

当你写一个eventfd时,一个函数eventfd_signal被调用。它包含下面一行代码,用于唤醒:

wake_up_locked_poll(&ctx->wqh, EPOLLIN);

wake_up_locked_poll是一个宏:

#define wake_up_locked_poll(x, m)                       \
    __wake_up_locked_key((x), TASK_NORMAL, poll_to_key(m))

其中__wake_up_locked_key定义为:

void __wake_up_locked_key(struct wait_queue_head *wq_head, unsigned int mode, void *key)
{
    __wake_up_common(wq_head, mode, 1, 0, key, NULL);
}

最后,__wake_up_common被声明为:

/*
 * The core wakeup function. Non-exclusive wakeups (nr_exclusive == 0) just
 * wake everything up. If it's an exclusive wakeup (nr_exclusive == small +ve
 * number) then we wake all the non-exclusive tasks and one exclusive task.
 *
 * There are circumstances in which we can try to wake a task which has already
 * started to run but is not in state TASK_RUNNING. try_to_wake_up() returns
 * zero in this (rare) case, and we handle it by continuing to scan the queue.
 */
static int __wake_up_common(struct wait_queue_head *wq_head, unsigned int mode,
            int nr_exclusive, int wake_flags, void *key,
            wait_queue_entry_t *bookmark)

注意nr_exclusive参数,您将看到写入eventfd只会唤醒一个独占服务器。
exclusive是什么意思?阅读epoll_ctl手册页可以让我们了解一些情况:
EPOLLEXCLUSIVE(自Linux 4.5起):
为要附加到目标文件描述符fd的epoll文件描述符设置独占唤醒模式。当发生唤醒事件并且多个epoll文件描述符使用EPOLLEXCLUSIVE附加到同一目标文件时,一个或多个epoll文件描述符将接收epoll_wait(2)事件。
在添加事件时,您没有使用EPOLLEXCLUSIVE,但是要使用epoll_wait等待,每个线程都必须将自己放入等待队列。函数do_epoll_wait通过调用ep_poll来执行等待。通过下面的代码,您可以看到它将当前线程添加到#1903行的等待队列:

__add_wait_queue_exclusive(&ep->wq, &wait);

这就是为什么epoll等待器是 * 独占的 *,所以只有一个线程被唤醒。这个行为已经在v2.6.22-rc 1中引入,相关的变化已经在here中讨论过。
对我来说,这看起来像是eventfd_signal函数中的一个bug:在信号量模式中,它应该利用等于所写入的值的nr_exclusive来执行唤醒。
所以你的选择是:

  • 为每个线程创建一个单独的epoll描述符(可能不适用于设计伸缩问题)
  • 在它周围放置互斥锁(扩展问题)
  • 使用poll,可能在eventfd和epoll上都使用
  • 通过用evenfd_write写4次1来分别唤醒每个线程(可能是你能做的最好的)。
2w2cym1i

2w2cym1i2#

在当前的linux版本(例如Ubuntu 22.04 LTS)下,问题中的代码完全可以正常工作。我对它进行了一些编辑,并添加了一些错误检查和时间报告。特别是,应该始终检查eventfd_read()的返回代码是否有虚假唤醒:

#include <sys/time.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
#include <stdio.h>

static int event_fd = -1;
static int epoll_fd = -1;

struct thread_data {
    int id;
};

void *thread(void *arg)
{
    struct thread_data* data = (struct thread_data *) arg;
    struct timeval tv;
    gettimeofday(&tv, NULL);
    printf("Thread %d started at %ld.%06ld\n", data->id, tv.tv_sec, tv.tv_usec);

    for(;;) {
        struct epoll_event event;
        int rc = epoll_wait(epoll_fd, &event, 1, -1);

        /* handle events */
        if(rc == 1 && event.data.fd == event_fd && event.events & EPOLLIN) {
            uint64_t val = 0;
            if(eventfd_read(event_fd, &val) >= 0) {
                gettimeofday(&tv, NULL);
                printf("Thread %d received stop signal at %ld.%06ld\n",
                       data->id, tv.tv_sec, tv.tv_usec);
                break;
            } else {
                gettimeofday(&tv, NULL);
                printf("Thread %d received spurious wake up at %ld.%06ld\n",
                       data->id, tv.tv_sec, tv.tv_usec);
            }
        }
    }

    return NULL;
}

int main(void)
{
    enum { THREADS = 4 };
    enum { WAKE_FIRST = 1 };

    epoll_fd = epoll_create1(0);
    event_fd = eventfd(0, EFD_SEMAPHORE| EFD_NONBLOCK);

    struct epoll_event event;
    event.events = EPOLLIN;
    event.data.fd = event_fd;
    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, event_fd, &event);

    pthread_t thrd[THREADS];
    struct thread_data data[THREADS];

    for(int i = 0; i < THREADS; i++) {
        data[i].id = i;
        pthread_create(&thrd[i], NULL, &thread, (void *) &data[i]);
    }

    /* let threads reach epoll_wait() : */
    usleep(100000);

    struct timeval tv;
    gettimeofday(&tv, NULL);
    printf("\nSending wake signal to %d threads at %ld.%06ld\n",
           WAKE_FIRST, tv.tv_sec, tv.tv_usec);
    eventfd_write(event_fd, WAKE_FIRST);

    if(THREADS > WAKE_FIRST) {
        usleep(100000);
        gettimeofday(&tv, NULL);
        printf("\nSending wake signal to %d threads at %ld.%06ld\n",
               THREADS - WAKE_FIRST, tv.tv_sec, tv.tv_usec);
        eventfd_write(event_fd, THREADS - WAKE_FIRST);
    }

    for(int i = 0; i < THREADS; i++) {
        pthread_join(thrd[i], NULL);
    }
}

典型输出:

Thread 0 started at 1679048746.554414
Thread 1 started at 1679048746.554440
Thread 2 started at 1679048746.554455
Thread 3 started at 1679048746.554492

Sending wake signal to 1 threads at 1679048746.655088
Thread 3 received stop signal at 1679048746.655170

Sending wake signal to 3 threads at 1679048746.755238
Thread 2 received stop signal at 1679048746.755341
Thread 1 received stop signal at 1679048746.755414
Thread 0 received stop signal at 1679048746.755479

还有一些观察:

  • 该代码也适用于THREADSWAKE_FIRST的其他值。
  • 如果在创建线程之前执行eventfd_write(event_fd, WAKE_FIRST),事情甚至可以正常工作。
  • 如果一个线程没有立即使用报告的事件,并且在最终执行eventfd_read()之前又调用了epoll_wait()几次,那么这些对epoll_wait()的重复调用将立即返回。
  • 如果线程执行(愚蠢的)usleep()之前调用eventfd_read(),这会导致其他线程的虚假唤醒。(s),这是第一次发出信号,参与阻止系统调用。这是一个很好的功能,而不是一个错误,在我看来。是的,与所有锁定的事情,应当总是检查伪唤醒。

相关问题