Boost:asio 和多线程中的异步
Boost:asio and async in multi-threading
我需要调用方法,这是对远程服务器的请求。之后我想等待一个答案,等待不会被其他异步阻塞function/objects(例如定时器)。
方法got_response(...) 告诉用户他从远程服务器得到了答案,方法还获取了我们作为答案得到的条目数据。下面我得到了我的解决方案,但有时可以在单线程中调用计时器,这将导致方法 got_response() 挂起。
如何在其他线程中调用定时器来保证答案模拟。我的问题还有其他解决方案吗?
#include <iostream>
#include <boost/asio.hpp>
#include <future>
#include <thread>
#include <vector>
using namespace std;
namespace io = boost::asio;
struct Reply
{
atomic<bool> ready;
atomic<int> result;
future<void> future_result;
Reply()
{
ready = false;
result = 0;
}
void call()
{
cout << "retry called!" << endl;
future_result = async([&]()
{
while (!ready)
{
this_thread::yield();
}
});
}
int get()
{
future_result.wait();
return result.load();
}
void got_response(int res)
{
result = res;
ready = true;
}
};
int main()
{
Reply reply;
reply.call();
io::io_context context(4);
io::steady_timer timer1(context, std::chrono::seconds(2));
timer1.async_wait([&](const boost::system::error_code &ec)
{ cout << "timer 1, thread name: " << this_thread::get_id() << endl; });
io::steady_timer timer2(context, std::chrono::seconds(3));
timer2.async_wait([&](const boost::system::error_code &ec)
{
cout << "timer 2, thread name: " << this_thread::get_id() << endl;
cout << reply.get() << endl;
});
io::steady_timer timer3(context, std::chrono::seconds(10));
timer3.async_wait([&](const boost::system::error_code &ec)
{
cout << "timer 3, thread name: " << this_thread::get_id() << endl;
reply.got_response(1337);
});
vector<thread> threads;
auto count = 2;
for (int n = 0; n < count; ++n)
{
threads.emplace_back([&]
{ context.run(); });
}
for (auto &th : threads)
{
th.join();
}
}
结果:
retry called!
timer 1, thread name: 140712511198784
timer 2, thread name: 140712519591488
timer 3, thread name: 140712511198784
1337
哇。这在几个层面上过于复杂。
futures 可以输入 return 值(这实际上是 future 在同步原语上的全部意义)
fturue 可以用值表示就绪,无需将就绪复制到bool
,然后将结果复制到某处
这让我很困惑:
int get()
{
_fut.wait();
return _result.load();
}
等待未来,然后 return 就是 _result
,你知道你发明 _ready
的目的是什么吗?
你知道 std::async
不是 Boost ASIO 的一部分吗?事实上,它不能很好地工作,因为正如您正确注意到的那样,它引入了(未指定数量的)线程。总的来说,我的建议是不要使用 std::async
(很难正确使用),当然也不要使用 ASIO
当您看到相同的变量名称 var1、var2、var3 时,是时候重构您的代码了(如果它包含数据成员,则重构为函数或 类):
std::deque<io::steady_timer> timers;
for (int i = 1; i <= 3; ++i) {
auto& timer = timers.emplace_back(context, std::chrono::seconds(1+i));
timer.async_wait([i](error_code ec) {
std::cout << "timer " << i
<< ", thread name: " << std::this_thread::get_id()
<< std::endl;
});
}
而不是线程向量,考虑 boost::thread_group
或者 boost::asio::thread_pool
.
如果手动运行IO线程,记得处理异常(Should the exception thrown by boost::asio::io_service::run() be caught?),所以
boost::thread_group threads;
for (int n = 0; n < 2; ++n) {
threads.create_thread([&] { context.run(); });
}
threads.join_all();
或者确实
io::thread_pool context(2);
context.join();
这是非常效率低下
while (!_ready) {
std::this_thread::yield();
}
只需设置未来值以表示它已准备就绪:
using namespace std
通常不是一个好主意 (Why is "using namespace std;" considered bad practice?)
演示
这是我对问题代码的扩展但简化的看法:
#include <boost/asio.hpp>
#include <deque>
#include <future>
#include <iostream>
#include <thread>
namespace io = boost::asio;
using namespace std::chrono_literals;
using boost::system::error_code;
// not very useful in practice, but for debug output in main
std::ostream& debug(error_code);
template <typename Fut> bool is_ready(Fut const& fut) {
return fut.wait_for(0s) == std::future_status::ready;
}
int main() {
std::promise<int> reply;
std::shared_future got_value = reply.get_future();
io::thread_pool context(2);
std::deque<io::steady_timer> timers;
for (int i = 1; i <= 10; ++i) {
timers //
.emplace_back(context, i * 1s)
.async_wait([&got_value](error_code ec) {
if (is_ready(got_value))
debug(ec) << " Reply:" << got_value.get() << std::endl;
else
debug(ec) << " (reply not ready)" << std::endl;
});
}
timers //
.emplace_back(context, 4'500ms)
.async_wait([&reply](error_code ec) {
debug(ec) << " setting value" << std::endl;
reply.set_value(1337);
});
context.join();
}
int friendly_thread_id() {
return std::hash<std::thread::id>{}(std::this_thread::get_id()) % 256;
}
#include <iomanip>
std::ostream& debug(error_code ec) {
auto now = std::chrono::system_clock::now;
static auto program_start = now();
return std::cout //
<< ((now() - program_start) / 1ms) << "ms\t"
<< "thread:" << std::hex << std::setfill('0') << std::showbase
<< std::setw(2) << friendly_thread_id() << std::dec << " ";
}
#include <iomanip>
std::ostream& debug(error_code ec) {
auto now = std::chrono::system_clock::now;
static auto program_start = now();
return std::cout //
<< ((now() - program_start) / 1ms) << "ms\t"
<< "thread:" << std::hex << std::setfill('0') << std::showbase
<< std::setw(2) << pretty_thread_id() << std::dec << " ";
}
版画
0ms thread:0x5f (reply not ready)
999ms thread:0xf3 (reply not ready)
1999ms thread:0x5f (reply not ready)
2999ms thread:0x5f (reply not ready)
3499ms thread:0xf3 setting value
3999ms thread:0x5f Reply:1337
4999ms thread:0xf3 Reply:1337
5999ms thread:0xf3 Reply:1337
6999ms thread:0xf3 Reply:1337
7999ms thread:0xf3 Reply:1337
8999ms thread:0xf3 Reply:1337
我需要调用方法,这是对远程服务器的请求。之后我想等待一个答案,等待不会被其他异步阻塞function/objects(例如定时器)。
方法got_response(...) 告诉用户他从远程服务器得到了答案,方法还获取了我们作为答案得到的条目数据。下面我得到了我的解决方案,但有时可以在单线程中调用计时器,这将导致方法 got_response() 挂起。
如何在其他线程中调用定时器来保证答案模拟。我的问题还有其他解决方案吗?
#include <iostream>
#include <boost/asio.hpp>
#include <future>
#include <thread>
#include <vector>
using namespace std;
namespace io = boost::asio;
struct Reply
{
atomic<bool> ready;
atomic<int> result;
future<void> future_result;
Reply()
{
ready = false;
result = 0;
}
void call()
{
cout << "retry called!" << endl;
future_result = async([&]()
{
while (!ready)
{
this_thread::yield();
}
});
}
int get()
{
future_result.wait();
return result.load();
}
void got_response(int res)
{
result = res;
ready = true;
}
};
int main()
{
Reply reply;
reply.call();
io::io_context context(4);
io::steady_timer timer1(context, std::chrono::seconds(2));
timer1.async_wait([&](const boost::system::error_code &ec)
{ cout << "timer 1, thread name: " << this_thread::get_id() << endl; });
io::steady_timer timer2(context, std::chrono::seconds(3));
timer2.async_wait([&](const boost::system::error_code &ec)
{
cout << "timer 2, thread name: " << this_thread::get_id() << endl;
cout << reply.get() << endl;
});
io::steady_timer timer3(context, std::chrono::seconds(10));
timer3.async_wait([&](const boost::system::error_code &ec)
{
cout << "timer 3, thread name: " << this_thread::get_id() << endl;
reply.got_response(1337);
});
vector<thread> threads;
auto count = 2;
for (int n = 0; n < count; ++n)
{
threads.emplace_back([&]
{ context.run(); });
}
for (auto &th : threads)
{
th.join();
}
}
结果:
retry called!
timer 1, thread name: 140712511198784
timer 2, thread name: 140712519591488
timer 3, thread name: 140712511198784
1337
哇。这在几个层面上过于复杂。
futures 可以输入 return 值(这实际上是 future 在同步原语上的全部意义)
fturue 可以用值表示就绪,无需将就绪复制到
bool
,然后将结果复制到某处这让我很困惑:
int get() { _fut.wait(); return _result.load(); }
等待未来,然后 return 就是
_result
,你知道你发明_ready
的目的是什么吗?你知道
std::async
不是 Boost ASIO 的一部分吗?事实上,它不能很好地工作,因为正如您正确注意到的那样,它引入了(未指定数量的)线程。总的来说,我的建议是不要使用std::async
(很难正确使用),当然也不要使用 ASIO当您看到相同的变量名称 var1、var2、var3 时,是时候重构您的代码了(如果它包含数据成员,则重构为函数或 类):
std::deque<io::steady_timer> timers; for (int i = 1; i <= 3; ++i) { auto& timer = timers.emplace_back(context, std::chrono::seconds(1+i)); timer.async_wait([i](error_code ec) { std::cout << "timer " << i << ", thread name: " << std::this_thread::get_id() << std::endl; }); }
而不是线程向量,考虑
boost::thread_group
或者boost::asio::thread_pool
.如果手动运行IO线程,记得处理异常(Should the exception thrown by boost::asio::io_service::run() be caught?),所以
boost::thread_group threads; for (int n = 0; n < 2; ++n) { threads.create_thread([&] { context.run(); }); } threads.join_all();
或者确实
io::thread_pool context(2); context.join();
这是非常效率低下
while (!_ready) { std::this_thread::yield(); }
只需设置未来值以表示它已准备就绪:
using namespace std
通常不是一个好主意 (Why is "using namespace std;" considered bad practice?)
演示
这是我对问题代码的扩展但简化的看法:
#include <boost/asio.hpp>
#include <deque>
#include <future>
#include <iostream>
#include <thread>
namespace io = boost::asio;
using namespace std::chrono_literals;
using boost::system::error_code;
// not very useful in practice, but for debug output in main
std::ostream& debug(error_code);
template <typename Fut> bool is_ready(Fut const& fut) {
return fut.wait_for(0s) == std::future_status::ready;
}
int main() {
std::promise<int> reply;
std::shared_future got_value = reply.get_future();
io::thread_pool context(2);
std::deque<io::steady_timer> timers;
for (int i = 1; i <= 10; ++i) {
timers //
.emplace_back(context, i * 1s)
.async_wait([&got_value](error_code ec) {
if (is_ready(got_value))
debug(ec) << " Reply:" << got_value.get() << std::endl;
else
debug(ec) << " (reply not ready)" << std::endl;
});
}
timers //
.emplace_back(context, 4'500ms)
.async_wait([&reply](error_code ec) {
debug(ec) << " setting value" << std::endl;
reply.set_value(1337);
});
context.join();
}
int friendly_thread_id() {
return std::hash<std::thread::id>{}(std::this_thread::get_id()) % 256;
}
#include <iomanip>
std::ostream& debug(error_code ec) {
auto now = std::chrono::system_clock::now;
static auto program_start = now();
return std::cout //
<< ((now() - program_start) / 1ms) << "ms\t"
<< "thread:" << std::hex << std::setfill('0') << std::showbase
<< std::setw(2) << friendly_thread_id() << std::dec << " ";
}
#include <iomanip>
std::ostream& debug(error_code ec) {
auto now = std::chrono::system_clock::now;
static auto program_start = now();
return std::cout //
<< ((now() - program_start) / 1ms) << "ms\t"
<< "thread:" << std::hex << std::setfill('0') << std::showbase
<< std::setw(2) << pretty_thread_id() << std::dec << " ";
}
版画
0ms thread:0x5f (reply not ready)
999ms thread:0xf3 (reply not ready)
1999ms thread:0x5f (reply not ready)
2999ms thread:0x5f (reply not ready)
3499ms thread:0xf3 setting value
3999ms thread:0x5f Reply:1337
4999ms thread:0xf3 Reply:1337
5999ms thread:0xf3 Reply:1337
6999ms thread:0xf3 Reply:1337
7999ms thread:0xf3 Reply:1337
8999ms thread:0xf3 Reply:1337