使用管道在 C++ 中创建调度队列/线程处理程序:FIFO 溢出

Creating a dispatch queue / thread handler in C++ with pipes: FIFOs overfilling

线程的创建和使用需要大量资源,因此线程池通常会被异步任务重用。任务被打包,然后 "posted" 到代理,该代理将在下一个可用线程上排队任务。

这就是调度队列(即 Apple 的 Grand Central Dispatch)和线程处理程序(Android 的 Looper 机制)背后的想法。

现在,我正在尝试自己动手。事实上,我正在填补 Android 中的空白,其中 API 用于在 Java 中发布任务,但在本机 NDK 中没有。但是,我尽可能地保持这个问题平台独立。

管道是我的方案的理想选择。我可以轻松地在我的工作线程上轮询 pipe(2) 的读取端的文件描述符,并通过写入写入端将来自任何其他线程的任务排队。这是它的样子:

int taskRead, taskWrite;

void setup() {
    // Create the pipe
    int taskPipe[2];
    ::pipe(taskPipe);
    taskRead = taskPipe[0];
    taskWrite = taskPipe[1];

    // Set up a routine that is called when task_r reports new data
    function_that_polls_file_descriptor(taskRead, []() {
        // Read the callback data
        std::function<void(void)>* taskPtr;
        ::read(taskRead, &taskPtr, sizeof(taskPtr));

        // Run the task - this is unsafe! See below.
        (*taskPtr)();

        // Clean up
        delete taskPtr;
    });
}

void post(const std::function<void(void)>& task) {
    // Copy the function onto the heap
    auto* taskPtr = new std::function<void(void)>(task);

    // Write the pointer to the pipe - this may block if the FIFO is full!
    ::write(taskWrite, &taskPtr, sizeof(taskPtr));
}

此代码将 std::function 放在堆上,并将指针传递给管道。 function_that_polls_file_descriptor 然后调用提供的表达式来读取管道并执行函数。请注意,此示例中没有安全检查。

这在 99% 的情况下都很好,但有一个主要缺点。管道的大小有限,如果管道已满,则对 post() 的调用将挂起。这本身并不是不安全的,直到在任务中调用post()

auto evil = []() {
    // Post a new task back onto the queue
    post({});
    // Not enough new tasks, let's make more!
    for (int i = 0; i < 3; i++) {
        post({});
    }

    // Now for each time this task is posted, 4 more tasks will be added to the queue.
});

post(evil);
post(evil);
...

如果发生这种情况,那么工作线程将被阻塞,等待写入管道。但是管道的FIFO已经满了,工作线程没有从中读取任何东西,所以整个系统陷入了死锁。

如何确保从工作线程发出的对post()的调用总是成功,从而允许工作线程在事件发生时继续处理队列已满?

您可以使用旧的select来确定文件描述符是否准备好用于写入:

The file descriptors in writefds will be watched to see if space is available for write (though a large write may still block).

因为你写的是指针,你的write()根本不能归类为大。

显然你必须准备好处理 post 可能失败的事实,然后准备好稍后重试......否则你将面临无限增长的管道,直到你的系统再次崩溃.

或多或少(未测试):

bool post(const std::function<void(void)>& task) {
    bool post_res = false;

    // Copy the function onto the heap
    auto* taskPtr = new std::function<void(void)>(task);

    fd_set wfds;
    struct timeval tv;
    int retval;

    FD_ZERO(&wfds);
    FD_SET(taskWrite, &wfds);

    // Don't wait at all
    tv.tv_sec = 0;
    tv.tv_usec = 0;

    retval = select(1, NULL, &wfds, NULL, &tv);
    // select() returns 0 when no FD's are ready
    if (retval == -1) {
      // handle error condition
    } else if (retval > 0) {
      // Write the pointer to the pipe. This write will succeed
      ::write(taskWrite, &taskPtr, sizeof(taskPtr));
      post_res = true;
    }
    return post_res;
}

使管道写入文件描述符成为非阻塞的,这样当管道已满时 write 失败并显示 EAGAIN


一项改进是增加管道缓冲区大小。

另一个是使用 UNIX socket/socketpair 并增加套接字缓冲区大小。

另一种解决方案是使用 UNIX 数据报套接字,许多工作线程可以从中读取数据报,但只有一个可以获取下一个数据报。换句话说,您可以将数据报套接字用作线程调度程序。

感谢此 post 中的所有评论和其他答案,我现在有了解决此问题的可行方法。

我采用的技巧是通过检查哪个线程正在调用 post() 来确定工作线程的优先级。这是粗略的算法:

pipe ← NON-BLOCKING-PIPE()
overflow ← Ø
POST(task)
    success ← WRITE(task, pipe)
    IF NOT success THEN
        IF THREAD-IS-WORKER() THEN
            overflow ← overflow ∪ {task}
        ELSE
            WAIT(pipe)
            POST(task)

然后在工作线程上:

LOOP FOREVER
    task ← READ(pipe)
    RUN(task)

    FOR EACH overtask ∈ overflow
        RUN(overtask)

    overflow ← Ø

等待是用 pselect(2) 执行的,改编自@Sigismondo 的答案。

这是在我的原始代码示例中实现的算法,它将适用于单个工作线程(尽管我没有在复制粘贴后对其进行测试)。它可以通过为每个线程设置一个单独的溢出队列来扩展为线程池工作。

int taskRead, taskWrite;

// These variables are only allowed to be modified by the worker thread
std::__thread_id workerId;
std::queue<std::function<void(void)>> overflow;
bool overflowInUse;

void setup() {
    int taskPipe[2];
    ::pipe(taskPipe);
    taskRead = taskPipe[0];
    taskWrite = taskPipe[1];

    // Make the pipe non-blocking to check pipe overflows manually
    ::fcntl(taskWrite, F_SETFL, ::fcntl(taskWrite, F_GETFL, 0) | O_NONBLOCK);

    // Save the ID of this worker thread to compare later
    workerId = std::this_thread::get_id();
    overflowInUse = false;

    function_that_polls_file_descriptor(taskRead, []() {
        // Read the callback data
        std::function<void(void)>* taskPtr;
        ::read(taskRead, &taskPtr, sizeof(taskPtr));

        // Run the task
        (*taskPtr)();
        delete taskPtr;

        // Run any tasks that were posted to the overflow
        while (!overflow.empty()) {
            taskPtr = overflow.front();
            overflow.pop();

            (*taskPtr)();
            delete taskPtr;
        }

        // Release the overflow mechanism if applicable
        overflowInUse = false;
    });
}

bool write(std::function<void(void)>* taskPtr, bool blocking = true) {
    ssize_t rc = ::write(taskWrite, &taskPtr, sizeof(taskPtr));

    // Failure handling
    if (rc < 0) {
        // If blocking is allowed, wait for pipe to become available
        int err = errno;
        if ((errno == EAGAIN || errno == EWOULDBLOCK) && blocking) {
            fd_set fds;
            FD_ZERO(&fds);
            FD_SET(taskWrite, &fds);

            ::pselect(1, nullptr, &fds, nullptr, nullptr, nullptr);

            // Try again
            return write(tdata);
        }

        // Otherwise return false
        return false;
    }

    return true;
}

void post(const std::function<void(void)>& task) {
    auto* taskPtr = new std::function<void(void)>(task);

    if (std::this_thread::get_id() == workerId) {
        // The worker thread gets 1st-class treatment.
        // It won't be blocked if the pipe is full, instead
        // using an overflow queue until the overflow has been cleared.
        if (!overflowInUse) {
            bool success = write(taskPtr, false);
            if (!success) {
                overflow.push(taskPtr);
                overflowInUse = true;
            }
        } else {
            overflow.push(taskPtr);
        }
    } else {
        write(taskPtr);
    }
}

如果您只看Android/Linux,使用管道并不是最先进的方法,而是将事件文件描述符与 epoll 一起使用是可行的方法。