使用 C++20 协程在线程之间切换

Switching between threads with C++20 coroutines

有一个 example 使用 C++20 协程切换到不同的线程:

#include <coroutine>
#include <iostream>
#include <stdexcept>
#include <thread>

auto switch_to_new_thread(std::jthread& out) {
    struct awaitable {
        std::jthread* p_out;
        bool await_ready() { return false; }
        void await_suspend(std::coroutine_handle<> h) {
            std::jthread& out = *p_out;
            if (out.joinable())
                throw std::runtime_error("Output jthread parameter not empty");
            out = std::jthread([h] { h.resume(); });
            // Potential undefined behavior: accessing potentially destroyed *this
            // std::cout << "New thread ID: " << p_out->get_id() << '\n';
            std::cout << "New thread ID: " << out.get_id() << '\n'; // this is OK
        }
        void await_resume() {}
    };
    return awaitable{ &out };
}

struct task {
    struct promise_type {
        task get_return_object() { return {}; }
        std::suspend_never initial_suspend() { return {}; }
        std::suspend_never final_suspend() noexcept { return {}; }
        void return_void() {}
        void unhandled_exception() {}
    };
};

task resuming_on_new_thread(std::jthread& out) {
    std::cout << "Coroutine started on thread: " << std::this_thread::get_id() << '\n';
    co_await switch_to_new_thread(out);
    // awaiter destroyed here
    std::cout << "Coroutine resumed on thread: " << std::this_thread::get_id() << '\n';
}

int main() {
    std::jthread out;
    resuming_on_new_thread(out);
}

协程在主线程上启动并切换到新创建的线程。

让它切换回主线程的正确方法是什么?

所以下面的代码

task resuming_on_new_thread(std::jthread& out) {
    std::cout << "Coroutine started on thread: " << std::this_thread::get_id() << '\n';
    co_await switch_to_new_thread(out);
    // awaiter destroyed here
    std::cout << "Coroutine resumed on thread: " << std::this_thread::get_id() << '\n';
    co_await switch_to_main_thread();
    std::cout << "Coroutine resumed on thread: " << std::this_thread::get_id() << '\n';
}

会打印

Coroutine started on thread: 139972277602112
New thread ID: 139972267284224
Coroutine resumed on thread: 139972267284224
Coroutine resumed on thread: 139972277602112

实现这一点的一种方法是拥有一个 thread-safe 队列,协程将自己放入该队列以告诉主线程“请立即恢复我”。到那时,您基本上是在构建一个线程池。主要功能必须监视该队列(定期轮询或等待将某些内容放入其中),然后在可用时获取并执行一个元素(工作项)。

switch_to_new_thread 实际上 创建一个新线程 ,它不会 切换到一个新线程 。然后它会注入恢复协程的代码。

要在特定线程上 运行 编码,您实际上必须在该线程上 运行 编码。要恢复协程,该特定线程必须 运行 恢复该协程的代码。

在这里,您通过创建一个 brand-new 线程并注入执行 resume.

的代码来做到这一点

执行此类操作的传统方法是使用消息泵。您要参与的线程有一个消息泵和一个事件队列。它 运行 按顺序排列事件。

要创建特定线程 运行 一些代码,您可以向该事件队列发送一条消息,其中包含指令(可能是实际代码,可能只是一个值)。

为此,这样的“事件消费线程”不止一个std::jthreadstd::thread;它是一个线程安全队列,线程中的一些任务从中弹出任务并执行它们。

在这样的系统中,您可以通过发送消息在线程之间移动。

所以你有一个队列:

template<class T>
struct threadsafe_queue {
  [[nodiscard]] std::optional<T> pop();
  [[nodiscard]] std::deque<T> pop_many(std::optional<std::size_t> count = {}); // defaults to all
  [[nodiscard]] bool push(T);
  template<class C, class D>
  [[nodiscard]] std::optional<T> wait_until_pop(std::chrono::time_point<C,D>);
  void abort();
  [[nodiscard]] bool is_aborted() const { return aborted; }
private:
  mutable std::mutex m;
  std::condition_variable cv;
  std::deque<T> queue;
  bool aborted = false;
  auto lock() const { return std::unique_lock(m); }
};

任务数:

using task_queue = threadsafe_queue<std::function<void()>>;

一个基本的消息泵是:

void message_pump( task_queue& q ) {
  while (auto f = q.pop()) {
    if (*f) (*f)();
  }
}

然后您将创建两个 task_queues,一个用于您的主线程,一个用于您的辅助线程。要切换到 worker 而不是创建新的 jthread,您需要:

workerq.push( [&]{ h.resume(); } );

同样切换到主

mainq.push( [&]{ h.resume(); } );

我跳过了很多细节,但这是您如何操作的草图。