对 boost::asio::io_context::run 的困惑
Confusion about boost::asio::io_context::run
我目前正在做一个使用 MQTT 协议进行通信的项目。
在专用文件中有一个会话 class,它基本上只是设置发布处理程序,即调用的回调,当此客户端收到消息时(处理程序检查主题是否匹配“ZEUXX/var",然后反序列化框架的二进制内容,随后取消订阅该主题):
session.hpp:
class Session
{
public:
Session()
{
comobj = MQTT_NS::make_sync_client(ioc, "localhost", "1883", MQTT_NS::protocol_version::v5);
using packet_id_t = typename std::remove_reference_t<decltype(*comobj)>::packet_id_t;
// Setup client
comobj->set_client_id(clientId);
comobj->set_clean_session(true);
/* If someone sends commands to this client */
comobj->set_v5_publish_handler( // use v5 handler
[&](MQTT_NS::optional<packet_id_t> /*packet_id*/,
MQTT_NS::publish_options pubopts,
MQTT_NS::buffer topic_name,
MQTT_NS::buffer contents,
MQTT_NS::v5::properties /*props*/) {
std::cout << "[client] publish received. "
<< " dup: " << pubopts.get_dup()
<< " qos: " << pubopts.get_qos()
<< " retain: " << pubopts.get_retain() << std::endl;
std::string_view topic = std::string_view(topic_name.data(), topic_name.size());
std::cout << " -> topic: " << topic << std::endl;
else if (topic.substr(0, 9) == "ZEUXX/var")
{
std::cout << "[client] reading variable name: " << topic.substr(10, topic.size() - 9) << std::endl;
auto result = 99; // dummy variable, normally an std::variant of float, int32_t uint8_t
// obtained by deserialzing the binary content of the frame
std::cout << comobj->unsubscribe(std::string{topic});
}
return true;
});
}
void readvar(const std::string &varname)
{
comobj->publish(serialnumber + "/read", varname, MQTT_NS::qos::at_most_once);
comobj->subscribe(serialnumber + "/var/" + varname, MQTT_NS::qos::at_most_once);
}
void couple()
{
comobj->connect();
ioc.run();
}
void decouple()
{
comobj->disconnect();
std::cout << "[client] disconnected..." << std::endl;
}
private:
std::shared_ptr<
MQTT_NS::callable_overlay<
MQTT_NS::sync_client<MQTT_NS::tcp_endpoint<as::ip::tcp::socket, as::io_context::strand>>>>
comobj;
boost::asio::io_context ioc;
};
客户端基于 boost::asio::io_context
对象,这恰好是我困惑的根源。在我的主文件中,我有以下代码。
main.cpp:
#include "session.hpp"
int main()
{
Session session;
session.couple();
session.readvar("speedcpu");
}
本质上,这会创建 class 会话的一个实例,并且情侣成员调用 boost::asio::io_context::run
成员。这将运行 io_context 对象的事件处理循环并阻塞主线程,即永远不会到达主函数的第三行。
我想启动一个连接 (session.couple),然后执行我的发布和订阅命令 (session.readvar)。我的问题是:我该如何正确地做到这一点?
从概念上讲,我的目标最好用以下 python-代码表达:
client.connect("localhost", 1883)
# client.loop_forever() that's what happens at the moment, the program
# doesn't continue from here
# The process loop get's started, however it does not block the program and
# one can send publish command subsequently.
client.loop_start()
while True:
client.publish("ZEUXX/read", "testread")
time.sleep(20)
运行 单独线程中的 io_context 对象似乎不像我尝试的那样工作,关于如何解决这个问题有什么建议吗?我尝试的是以下内容:
适应 session.hpp
// Adapt the couple function to run io_context in a separate thread
void couple()
{
comobj->connect();
std::thread t(boost::bind(&boost::asio::io_context::run, &ioc));
t.detach();
}
main.cpp
中的调整
int main(int argc, char** argv)
{
Session session;
session.couple();
std::cout << "successfully started io context in separate thread" << std::endl;
session.readvar("speedcpu");
}
现在到达 std::cout 行,即程序不会卡在 class 的一对成员中 io_context.run()。但是,在这一行之后,我立即收到错误消息:“网络连接被本地系统中止”。
有趣的是,当我使用 t.join()
而不是 t.detach()
时,没有错误,但是我使用 t.join()
时的行为与调用 io_context.run()
直接,即阻塞程序。
当 io_context
运行 失业时,它 return 来自 run()
。
如果您不 post 任何工作,run()
将始终立即 return。任何后续 run()
也会立即 returns,即使新作品是 posted。
要在完成后重新使用 io_context
,请使用 io_context.reset()
。在你的情况下,最好
- 使用 work guard (https://www.boost.org/doc/libs/1_73_0/doc/html/boost_asio/reference/executor_work_guard.html),查看许多库示例
- 如果你已经在后台线程运行 运行 中
couple()
中的 ioc
甚至不要“运行”
如果您需要同步行为,请不要 运行 它在后台线程上。
另外请记住,您需要提供正常关闭,这对于分离线程来说更加困难 - 毕竟,现在您无法 join()
它知道它何时退出。
鉴于您对现有答案的评论:
io_context.run()
never return because it never runs out of work (it is being kept alive from the MQTT server). As a result, the thread gets blocked as soon as I enter the run()
method and I cannot send any publish and subscribe frames anymore. That was when I thought it would be clever to run the io_context
in a separate thread to not block the main thread. However, when I detach
this separate thread, the connection runs into an error, if I use join
however, it works fine but the main thread gets blocked again.
我假设您知道如何在单独的线程中成功获得此 运行ning。您面临的“问题”是,由于 io_context
不会 运行 停止工作,因此调用 thread::join
也会阻塞,因为它将等待线程停止执行。最简单的解决方案是在 thread::join
之前调用 io_context::stop
。来自 the official docs:
This function does not block, but instead simply signals the io_context
to stop. All invocations of its run()
or run_one()
member functions should return as soon as possible. Subsequent calls to run()
, run_one()
, poll()
or poll_one()
will return immediately until restart()
is called.
也就是说,调用 io_context::stop
将导致 io_context::run
调用 return(“尽快”),从而使相关线程可连接。
您还需要将对 thread
的引用保存在某处(可能作为 Session
class 的属性)并且只调用 thread::join
在之后你已经完成了剩下的工作(例如调用Session::readvar
)而不是在Session::couple
.
我目前正在做一个使用 MQTT 协议进行通信的项目。
在专用文件中有一个会话 class,它基本上只是设置发布处理程序,即调用的回调,当此客户端收到消息时(处理程序检查主题是否匹配“ZEUXX/var",然后反序列化框架的二进制内容,随后取消订阅该主题):
session.hpp:
class Session
{
public:
Session()
{
comobj = MQTT_NS::make_sync_client(ioc, "localhost", "1883", MQTT_NS::protocol_version::v5);
using packet_id_t = typename std::remove_reference_t<decltype(*comobj)>::packet_id_t;
// Setup client
comobj->set_client_id(clientId);
comobj->set_clean_session(true);
/* If someone sends commands to this client */
comobj->set_v5_publish_handler( // use v5 handler
[&](MQTT_NS::optional<packet_id_t> /*packet_id*/,
MQTT_NS::publish_options pubopts,
MQTT_NS::buffer topic_name,
MQTT_NS::buffer contents,
MQTT_NS::v5::properties /*props*/) {
std::cout << "[client] publish received. "
<< " dup: " << pubopts.get_dup()
<< " qos: " << pubopts.get_qos()
<< " retain: " << pubopts.get_retain() << std::endl;
std::string_view topic = std::string_view(topic_name.data(), topic_name.size());
std::cout << " -> topic: " << topic << std::endl;
else if (topic.substr(0, 9) == "ZEUXX/var")
{
std::cout << "[client] reading variable name: " << topic.substr(10, topic.size() - 9) << std::endl;
auto result = 99; // dummy variable, normally an std::variant of float, int32_t uint8_t
// obtained by deserialzing the binary content of the frame
std::cout << comobj->unsubscribe(std::string{topic});
}
return true;
});
}
void readvar(const std::string &varname)
{
comobj->publish(serialnumber + "/read", varname, MQTT_NS::qos::at_most_once);
comobj->subscribe(serialnumber + "/var/" + varname, MQTT_NS::qos::at_most_once);
}
void couple()
{
comobj->connect();
ioc.run();
}
void decouple()
{
comobj->disconnect();
std::cout << "[client] disconnected..." << std::endl;
}
private:
std::shared_ptr<
MQTT_NS::callable_overlay<
MQTT_NS::sync_client<MQTT_NS::tcp_endpoint<as::ip::tcp::socket, as::io_context::strand>>>>
comobj;
boost::asio::io_context ioc;
};
客户端基于 boost::asio::io_context
对象,这恰好是我困惑的根源。在我的主文件中,我有以下代码。
main.cpp:
#include "session.hpp"
int main()
{
Session session;
session.couple();
session.readvar("speedcpu");
}
本质上,这会创建 class 会话的一个实例,并且情侣成员调用 boost::asio::io_context::run
成员。这将运行 io_context 对象的事件处理循环并阻塞主线程,即永远不会到达主函数的第三行。
我想启动一个连接 (session.couple),然后执行我的发布和订阅命令 (session.readvar)。我的问题是:我该如何正确地做到这一点?
从概念上讲,我的目标最好用以下 python-代码表达:
client.connect("localhost", 1883)
# client.loop_forever() that's what happens at the moment, the program
# doesn't continue from here
# The process loop get's started, however it does not block the program and
# one can send publish command subsequently.
client.loop_start()
while True:
client.publish("ZEUXX/read", "testread")
time.sleep(20)
运行 单独线程中的 io_context 对象似乎不像我尝试的那样工作,关于如何解决这个问题有什么建议吗?我尝试的是以下内容:
适应 session.hpp
// Adapt the couple function to run io_context in a separate thread
void couple()
{
comobj->connect();
std::thread t(boost::bind(&boost::asio::io_context::run, &ioc));
t.detach();
}
main.cpp
中的调整int main(int argc, char** argv)
{
Session session;
session.couple();
std::cout << "successfully started io context in separate thread" << std::endl;
session.readvar("speedcpu");
}
现在到达 std::cout 行,即程序不会卡在 class 的一对成员中 io_context.run()。但是,在这一行之后,我立即收到错误消息:“网络连接被本地系统中止”。
有趣的是,当我使用 t.join()
而不是 t.detach()
时,没有错误,但是我使用 t.join()
时的行为与调用 io_context.run()
直接,即阻塞程序。
当 io_context
运行 失业时,它 return 来自 run()
。
如果您不 post 任何工作,run()
将始终立即 return。任何后续 run()
也会立即 returns,即使新作品是 posted。
要在完成后重新使用 io_context
,请使用 io_context.reset()
。在你的情况下,最好
- 使用 work guard (https://www.boost.org/doc/libs/1_73_0/doc/html/boost_asio/reference/executor_work_guard.html),查看许多库示例
- 如果你已经在后台线程运行 运行 中
couple()
中的ioc
甚至不要“运行”
如果您需要同步行为,请不要 运行 它在后台线程上。
另外请记住,您需要提供正常关闭,这对于分离线程来说更加困难 - 毕竟,现在您无法 join()
它知道它何时退出。
鉴于您对现有答案的评论:
io_context.run()
never return because it never runs out of work (it is being kept alive from the MQTT server). As a result, the thread gets blocked as soon as I enter therun()
method and I cannot send any publish and subscribe frames anymore. That was when I thought it would be clever to run theio_context
in a separate thread to not block the main thread. However, when Idetach
this separate thread, the connection runs into an error, if I usejoin
however, it works fine but the main thread gets blocked again.
我假设您知道如何在单独的线程中成功获得此 运行ning。您面临的“问题”是,由于 io_context
不会 运行 停止工作,因此调用 thread::join
也会阻塞,因为它将等待线程停止执行。最简单的解决方案是在 thread::join
之前调用 io_context::stop
。来自 the official docs:
This function does not block, but instead simply signals the
io_context
to stop. All invocations of itsrun()
orrun_one()
member functions should return as soon as possible. Subsequent calls torun()
,run_one()
,poll()
orpoll_one()
will return immediately untilrestart()
is called.
也就是说,调用 io_context::stop
将导致 io_context::run
调用 return(“尽快”),从而使相关线程可连接。
您还需要将对 thread
的引用保存在某处(可能作为 Session
class 的属性)并且只调用 thread::join
在之后你已经完成了剩下的工作(例如调用Session::readvar
)而不是在Session::couple
.