使用 C++20 协程在线程之间切换
Switching between threads with C++20 coroutines
有一个 example 使用 C++20 协程切换到不同的线程:
#include <coroutine>
#include <iostream>
#include <stdexcept>
#include <thread>
auto switch_to_new_thread(std::jthread& out) {
struct awaitable {
std::jthread* p_out;
bool await_ready() { return false; }
void await_suspend(std::coroutine_handle<> h) {
std::jthread& out = *p_out;
if (out.joinable())
throw std::runtime_error("Output jthread parameter not empty");
out = std::jthread([h] { h.resume(); });
// Potential undefined behavior: accessing potentially destroyed *this
// std::cout << "New thread ID: " << p_out->get_id() << '\n';
std::cout << "New thread ID: " << out.get_id() << '\n'; // this is OK
}
void await_resume() {}
};
return awaitable{ &out };
}
struct task {
struct promise_type {
task get_return_object() { return {}; }
std::suspend_never initial_suspend() { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
void return_void() {}
void unhandled_exception() {}
};
};
task resuming_on_new_thread(std::jthread& out) {
std::cout << "Coroutine started on thread: " << std::this_thread::get_id() << '\n';
co_await switch_to_new_thread(out);
// awaiter destroyed here
std::cout << "Coroutine resumed on thread: " << std::this_thread::get_id() << '\n';
}
int main() {
std::jthread out;
resuming_on_new_thread(out);
}
协程在主线程上启动并切换到新创建的线程。
让它切换回主线程的正确方法是什么?
所以下面的代码
task resuming_on_new_thread(std::jthread& out) {
std::cout << "Coroutine started on thread: " << std::this_thread::get_id() << '\n';
co_await switch_to_new_thread(out);
// awaiter destroyed here
std::cout << "Coroutine resumed on thread: " << std::this_thread::get_id() << '\n';
co_await switch_to_main_thread();
std::cout << "Coroutine resumed on thread: " << std::this_thread::get_id() << '\n';
}
会打印
Coroutine started on thread: 139972277602112
New thread ID: 139972267284224
Coroutine resumed on thread: 139972267284224
Coroutine resumed on thread: 139972277602112
实现这一点的一种方法是拥有一个 thread-safe 队列,协程将自己放入该队列以告诉主线程“请立即恢复我”。到那时,您基本上是在构建一个线程池。主要功能必须监视该队列(定期轮询或等待将某些内容放入其中),然后在可用时获取并执行一个元素(工作项)。
switch_to_new_thread
实际上 创建一个新线程 ,它不会 切换到一个新线程 。然后它会注入恢复协程的代码。
要在特定线程上 运行 编码,您实际上必须在该线程上 运行 编码。要恢复协程,该特定线程必须 运行 恢复该协程的代码。
在这里,您通过创建一个 brand-new 线程并注入执行 resume
.
的代码来做到这一点
执行此类操作的传统方法是使用消息泵。您要参与的线程有一个消息泵和一个事件队列。它 运行 按顺序排列事件。
要创建特定线程 运行 一些代码,您可以向该事件队列发送一条消息,其中包含指令(可能是实际代码,可能只是一个值)。
为此,这样的“事件消费线程”不止一个std::jthread
或std::thread
;它是一个线程安全队列,线程中的一些任务从中弹出任务并执行它们。
在这样的系统中,您可以通过发送消息在线程之间移动。
所以你有一个队列:
template<class T>
struct threadsafe_queue {
[[nodiscard]] std::optional<T> pop();
[[nodiscard]] std::deque<T> pop_many(std::optional<std::size_t> count = {}); // defaults to all
[[nodiscard]] bool push(T);
template<class C, class D>
[[nodiscard]] std::optional<T> wait_until_pop(std::chrono::time_point<C,D>);
void abort();
[[nodiscard]] bool is_aborted() const { return aborted; }
private:
mutable std::mutex m;
std::condition_variable cv;
std::deque<T> queue;
bool aborted = false;
auto lock() const { return std::unique_lock(m); }
};
任务数:
using task_queue = threadsafe_queue<std::function<void()>>;
一个基本的消息泵是:
void message_pump( task_queue& q ) {
while (auto f = q.pop()) {
if (*f) (*f)();
}
}
然后您将创建两个 task_queues,一个用于您的主线程,一个用于您的辅助线程。要切换到 worker 而不是创建新的 jthread
,您需要:
workerq.push( [&]{ h.resume(); } );
同样切换到主
mainq.push( [&]{ h.resume(); } );
我跳过了很多细节,但这是您如何操作的草图。
有一个 example 使用 C++20 协程切换到不同的线程:
#include <coroutine>
#include <iostream>
#include <stdexcept>
#include <thread>
auto switch_to_new_thread(std::jthread& out) {
struct awaitable {
std::jthread* p_out;
bool await_ready() { return false; }
void await_suspend(std::coroutine_handle<> h) {
std::jthread& out = *p_out;
if (out.joinable())
throw std::runtime_error("Output jthread parameter not empty");
out = std::jthread([h] { h.resume(); });
// Potential undefined behavior: accessing potentially destroyed *this
// std::cout << "New thread ID: " << p_out->get_id() << '\n';
std::cout << "New thread ID: " << out.get_id() << '\n'; // this is OK
}
void await_resume() {}
};
return awaitable{ &out };
}
struct task {
struct promise_type {
task get_return_object() { return {}; }
std::suspend_never initial_suspend() { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
void return_void() {}
void unhandled_exception() {}
};
};
task resuming_on_new_thread(std::jthread& out) {
std::cout << "Coroutine started on thread: " << std::this_thread::get_id() << '\n';
co_await switch_to_new_thread(out);
// awaiter destroyed here
std::cout << "Coroutine resumed on thread: " << std::this_thread::get_id() << '\n';
}
int main() {
std::jthread out;
resuming_on_new_thread(out);
}
协程在主线程上启动并切换到新创建的线程。
让它切换回主线程的正确方法是什么?
所以下面的代码
task resuming_on_new_thread(std::jthread& out) {
std::cout << "Coroutine started on thread: " << std::this_thread::get_id() << '\n';
co_await switch_to_new_thread(out);
// awaiter destroyed here
std::cout << "Coroutine resumed on thread: " << std::this_thread::get_id() << '\n';
co_await switch_to_main_thread();
std::cout << "Coroutine resumed on thread: " << std::this_thread::get_id() << '\n';
}
会打印
Coroutine started on thread: 139972277602112
New thread ID: 139972267284224
Coroutine resumed on thread: 139972267284224
Coroutine resumed on thread: 139972277602112
实现这一点的一种方法是拥有一个 thread-safe 队列,协程将自己放入该队列以告诉主线程“请立即恢复我”。到那时,您基本上是在构建一个线程池。主要功能必须监视该队列(定期轮询或等待将某些内容放入其中),然后在可用时获取并执行一个元素(工作项)。
switch_to_new_thread
实际上 创建一个新线程 ,它不会 切换到一个新线程 。然后它会注入恢复协程的代码。
要在特定线程上 运行 编码,您实际上必须在该线程上 运行 编码。要恢复协程,该特定线程必须 运行 恢复该协程的代码。
在这里,您通过创建一个 brand-new 线程并注入执行 resume
.
执行此类操作的传统方法是使用消息泵。您要参与的线程有一个消息泵和一个事件队列。它 运行 按顺序排列事件。
要创建特定线程 运行 一些代码,您可以向该事件队列发送一条消息,其中包含指令(可能是实际代码,可能只是一个值)。
为此,这样的“事件消费线程”不止一个std::jthread
或std::thread
;它是一个线程安全队列,线程中的一些任务从中弹出任务并执行它们。
在这样的系统中,您可以通过发送消息在线程之间移动。
所以你有一个队列:
template<class T>
struct threadsafe_queue {
[[nodiscard]] std::optional<T> pop();
[[nodiscard]] std::deque<T> pop_many(std::optional<std::size_t> count = {}); // defaults to all
[[nodiscard]] bool push(T);
template<class C, class D>
[[nodiscard]] std::optional<T> wait_until_pop(std::chrono::time_point<C,D>);
void abort();
[[nodiscard]] bool is_aborted() const { return aborted; }
private:
mutable std::mutex m;
std::condition_variable cv;
std::deque<T> queue;
bool aborted = false;
auto lock() const { return std::unique_lock(m); }
};
任务数:
using task_queue = threadsafe_queue<std::function<void()>>;
一个基本的消息泵是:
void message_pump( task_queue& q ) {
while (auto f = q.pop()) {
if (*f) (*f)();
}
}
然后您将创建两个 task_queues,一个用于您的主线程,一个用于您的辅助线程。要切换到 worker 而不是创建新的 jthread
,您需要:
workerq.push( [&]{ h.resume(); } );
同样切换到主
mainq.push( [&]{ h.resume(); } );
我跳过了很多细节,但这是您如何操作的草图。