从另一个线程恢复 asio 协程
Resuming asio coroutine from another thread
我在从另一个线程恢复 boost::asio 协程时遇到问题。这是示例代码:
#include <iostream>
#include <thread>
#include <boost/asio.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/spawn.hpp>
using namespace std;
using namespace boost;
void foo(asio::steady_timer& timer, asio::yield_context yield)
{
cout << "Enter foo" << endl;
timer.expires_from_now(asio::steady_timer::clock_type::duration::max());
timer.async_wait(yield);
cout << "Leave foo" << endl;
}
void bar(asio::steady_timer& timer)
{
cout << "Enter bar" << endl;
sleep(1); // wait a little for asio::io_service::run to be executed
timer.cancel();
cout << "Leave bar" << endl;
}
int main()
{
asio::io_service ioService;
asio::steady_timer timer(ioService);
asio::spawn(ioService, bind(foo, std::ref(timer), placeholders::_1));
thread t(bar, std::ref(timer));
ioService.run();
t.join();
return 0;
}
问题是asio::steady_timer对象不是线程安全的,导致程序崩溃。但是,如果我尝试使用互斥锁来同步对它的访问,那么我就会陷入僵局,因为 foo 的范围没有保留。
#include <iostream>
#include <thread>
#include <mutex>
#include <boost/asio.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/spawn.hpp>
using namespace std;
using namespace boost;
void foo(asio::steady_timer& timer, mutex& mtx, asio::yield_context yield)
{
cout << "Enter foo" << endl;
{
lock_guard<mutex> lock(mtx);
timer.expires_from_now(
asio::steady_timer::clock_type::duration::max());
timer.async_wait(yield);
}
cout << "Leave foo" << endl;
}
void bar(asio::steady_timer& timer, mutex& mtx)
{
cout << "Enter bar" << endl;
sleep(1); // wait a little for asio::io_service::run to be executed
{
lock_guard<mutex> lock(mtx);
timer.cancel();
}
cout << "Leave bar" << endl;
}
int main()
{
asio::io_service ioService;
asio::steady_timer timer(ioService);
mutex mtx;
asio::spawn(ioService, bind(foo, std::ref(timer), std::ref(mtx),
placeholders::_1));
thread t(bar, std::ref(timer), std::ref(mtx));
ioService.run();
t.join();
return 0;
}
如果我使用标准完成处理程序而不是协程,就没有这样的问题。
#include <iostream>
#include <thread>
#include <mutex>
#include <boost/asio.hpp>
#include <boost/asio/steady_timer.hpp>
using namespace std;
using namespace boost;
void baz(system::error_code ec)
{
cout << "Baz: " << ec.message() << endl;
}
void foo(asio::steady_timer& timer, mutex& mtx)
{
cout << "Enter foo" << endl;
{
lock_guard<mutex> lock(mtx);
timer.expires_from_now(
asio::steady_timer::clock_type::duration::max());
timer.async_wait(baz);
}
cout << "Leave foo" << endl;
}
void bar(asio::steady_timer& timer, mutex& mtx)
{
cout << "Enter bar" << endl;
sleep(1); // wait a little for asio::io_service::run to be executed
{
lock_guard<mutex> lock(mtx);
timer.cancel();
}
cout << "Leave bar" << endl;
}
int main()
{
asio::io_service ioService;
asio::steady_timer timer(ioService);
mutex mtx;
foo(std::ref(timer), std::ref(mtx));
thread t(bar, std::ref(timer), std::ref(mtx));
ioService.run();
t.join();
return 0;
}
使用协程时是否可能出现类似于上一个示例的行为。
协程在 strand
. In spawn()
, if one is not explicitly provided, a new strand
will be created for the coroutine. By explicitly providing strand
to spawn()
的上下文中运行,可以 post 进入将与协程同步的 strand
。
此外,如 sehe 所述,如果协程在一个线程中 运行,获取互斥锁,然后挂起,但在另一个线程中恢复并运行,则可能会发生未定义的行为,并且释放锁。为避免这种情况,理想情况下不应在协程挂起时持有锁。但是,如果有必要,必须保证协程在恢复时在同一个线程内运行,例如仅运行 io_service
来自单个线程。
这是基于原始示例的最小完整 example,其中 bar()
post 进入 strand
以取消计时器,导致 foo()
要恢复的协程:
#include <iostream>
#include <thread>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/steady_timer.hpp>
void foo(boost::asio::steady_timer& timer, boost::asio::yield_context yield)
{
std::cout << "Enter foo" << std::endl;
timer.expires_from_now(
boost::asio::steady_timer::clock_type::duration::max());
boost::system::error_code error;
timer.async_wait(yield[error]);
std::cout << "foo error: " << error.message() << std::endl;
std::cout << "Leave foo" << std::endl;
}
void bar(
boost::asio::io_service::strand& strand,
boost::asio::steady_timer& timer
)
{
std::cout << "Enter bar" << std::endl;
// Wait a little for asio::io_service::run to be executed
std::this_thread::sleep_for(std::chrono::seconds(1));
// Post timer cancellation into the strand.
strand.post([&timer]()
{
timer.cancel();
});
std::cout << "Leave bar" << std::endl;
}
int main()
{
boost::asio::io_service io_service;
boost::asio::steady_timer timer(io_service);
boost::asio::io_service::strand strand(io_service);
// Use an explicit strand, rather than having the io_service create.
boost::asio::spawn(strand, std::bind(&foo,
std::ref(timer), std::placeholders::_1));
// Pass the same strand to the thread, so that the thread may post
// handlers synchronized with the foo coroutine.
std::thread t(&bar, std::ref(strand), std::ref(timer));
io_service.run();
t.join();
}
提供以下输出:
Enter foo
Enter bar
foo error: Operation canceled
Leave foo
Leave bar
如 this answer, when the boost::asio::yield_context
detects that the asynchronous operation has failed, such as when the operation is canceled, it converts the boost::system::error_code
into a system_error
exception and throws. The above example uses yield_context::operator[]
所述,允许 yield_context
在失败时填充提供的 error_code
而不是抛出。
我在从另一个线程恢复 boost::asio 协程时遇到问题。这是示例代码:
#include <iostream>
#include <thread>
#include <boost/asio.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/spawn.hpp>
using namespace std;
using namespace boost;
void foo(asio::steady_timer& timer, asio::yield_context yield)
{
cout << "Enter foo" << endl;
timer.expires_from_now(asio::steady_timer::clock_type::duration::max());
timer.async_wait(yield);
cout << "Leave foo" << endl;
}
void bar(asio::steady_timer& timer)
{
cout << "Enter bar" << endl;
sleep(1); // wait a little for asio::io_service::run to be executed
timer.cancel();
cout << "Leave bar" << endl;
}
int main()
{
asio::io_service ioService;
asio::steady_timer timer(ioService);
asio::spawn(ioService, bind(foo, std::ref(timer), placeholders::_1));
thread t(bar, std::ref(timer));
ioService.run();
t.join();
return 0;
}
问题是asio::steady_timer对象不是线程安全的,导致程序崩溃。但是,如果我尝试使用互斥锁来同步对它的访问,那么我就会陷入僵局,因为 foo 的范围没有保留。
#include <iostream>
#include <thread>
#include <mutex>
#include <boost/asio.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/spawn.hpp>
using namespace std;
using namespace boost;
void foo(asio::steady_timer& timer, mutex& mtx, asio::yield_context yield)
{
cout << "Enter foo" << endl;
{
lock_guard<mutex> lock(mtx);
timer.expires_from_now(
asio::steady_timer::clock_type::duration::max());
timer.async_wait(yield);
}
cout << "Leave foo" << endl;
}
void bar(asio::steady_timer& timer, mutex& mtx)
{
cout << "Enter bar" << endl;
sleep(1); // wait a little for asio::io_service::run to be executed
{
lock_guard<mutex> lock(mtx);
timer.cancel();
}
cout << "Leave bar" << endl;
}
int main()
{
asio::io_service ioService;
asio::steady_timer timer(ioService);
mutex mtx;
asio::spawn(ioService, bind(foo, std::ref(timer), std::ref(mtx),
placeholders::_1));
thread t(bar, std::ref(timer), std::ref(mtx));
ioService.run();
t.join();
return 0;
}
如果我使用标准完成处理程序而不是协程,就没有这样的问题。
#include <iostream>
#include <thread>
#include <mutex>
#include <boost/asio.hpp>
#include <boost/asio/steady_timer.hpp>
using namespace std;
using namespace boost;
void baz(system::error_code ec)
{
cout << "Baz: " << ec.message() << endl;
}
void foo(asio::steady_timer& timer, mutex& mtx)
{
cout << "Enter foo" << endl;
{
lock_guard<mutex> lock(mtx);
timer.expires_from_now(
asio::steady_timer::clock_type::duration::max());
timer.async_wait(baz);
}
cout << "Leave foo" << endl;
}
void bar(asio::steady_timer& timer, mutex& mtx)
{
cout << "Enter bar" << endl;
sleep(1); // wait a little for asio::io_service::run to be executed
{
lock_guard<mutex> lock(mtx);
timer.cancel();
}
cout << "Leave bar" << endl;
}
int main()
{
asio::io_service ioService;
asio::steady_timer timer(ioService);
mutex mtx;
foo(std::ref(timer), std::ref(mtx));
thread t(bar, std::ref(timer), std::ref(mtx));
ioService.run();
t.join();
return 0;
}
使用协程时是否可能出现类似于上一个示例的行为。
协程在 strand
. In spawn()
, if one is not explicitly provided, a new strand
will be created for the coroutine. By explicitly providing strand
to spawn()
的上下文中运行,可以 post 进入将与协程同步的 strand
。
此外,如 sehe 所述,如果协程在一个线程中 运行,获取互斥锁,然后挂起,但在另一个线程中恢复并运行,则可能会发生未定义的行为,并且释放锁。为避免这种情况,理想情况下不应在协程挂起时持有锁。但是,如果有必要,必须保证协程在恢复时在同一个线程内运行,例如仅运行 io_service
来自单个线程。
这是基于原始示例的最小完整 example,其中 bar()
post 进入 strand
以取消计时器,导致 foo()
要恢复的协程:
#include <iostream>
#include <thread>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/steady_timer.hpp>
void foo(boost::asio::steady_timer& timer, boost::asio::yield_context yield)
{
std::cout << "Enter foo" << std::endl;
timer.expires_from_now(
boost::asio::steady_timer::clock_type::duration::max());
boost::system::error_code error;
timer.async_wait(yield[error]);
std::cout << "foo error: " << error.message() << std::endl;
std::cout << "Leave foo" << std::endl;
}
void bar(
boost::asio::io_service::strand& strand,
boost::asio::steady_timer& timer
)
{
std::cout << "Enter bar" << std::endl;
// Wait a little for asio::io_service::run to be executed
std::this_thread::sleep_for(std::chrono::seconds(1));
// Post timer cancellation into the strand.
strand.post([&timer]()
{
timer.cancel();
});
std::cout << "Leave bar" << std::endl;
}
int main()
{
boost::asio::io_service io_service;
boost::asio::steady_timer timer(io_service);
boost::asio::io_service::strand strand(io_service);
// Use an explicit strand, rather than having the io_service create.
boost::asio::spawn(strand, std::bind(&foo,
std::ref(timer), std::placeholders::_1));
// Pass the same strand to the thread, so that the thread may post
// handlers synchronized with the foo coroutine.
std::thread t(&bar, std::ref(strand), std::ref(timer));
io_service.run();
t.join();
}
提供以下输出:
Enter foo
Enter bar
foo error: Operation canceled
Leave foo
Leave bar
如 this answer, when the boost::asio::yield_context
detects that the asynchronous operation has failed, such as when the operation is canceled, it converts the boost::system::error_code
into a system_error
exception and throws. The above example uses yield_context::operator[]
所述,允许 yield_context
在失败时填充提供的 error_code
而不是抛出。