MQTT 客户端在消息发布期间无限期等待
MQTT client waits indefinitely during publish of message
我尝试使用 paho 库实现一个异步 MQTT 客户端,它接收关于主题“请求”的消息,制定一个字符串并将响应放在主题“响应”上。我使用回调来处理传入的消息。
#include "mqtt/async_client.h"
#include "mqtt/topic.h"
const std::string SERVER_ADDRESS {"tcp://localhost:2883"};
const std::string CLIENT_ID {"test_client"};
class TestCallback : public virtual mqtt::callback
{
// the mqtt client
mqtt::async_client& cli_;
// (re)connection success
void connected(const std::string& cause) override
{
cli_.subscribe("request", 0);
}
// callback for when a message arrives.
void message_arrived(mqtt::const_message_ptr msg) override
{
if( msg->get_topic() == "request" )
{
/* format response message here and put it into (string) msg */
mqtt::message_ptr pubmsg = mqtt::make_message("response", msg);
pubmsg->set_qos(2);
//// PROBLEMATIC CODE ////
cli_.publish(pubmsg)->wait();
//////////////////////////
}
}
public:
TestCallback(mqtt::async_client& cli)
: cli_(cli) {}
};
int main(int argc, char** argv)
{
mqtt::async_client cli(SERVER_ADDRESS, CLIENT_ID);
TestCallback cb(cli);
cli.set_callback(cb);
mqtt::connect_options connOpts = mqtt::connect_options_builder()
.clean_session(false)
.automatic_reconnect()
.finalize();
try
{
cli.connect(connOpts)->wait();
}
catch (const mqtt::exception& exc)
{
std::cerr << "[ERROR] " << exc.what() << std::endl;
return 1;
}
// run until the application is shut down
while (std::tolower(std::cin.get()) != 'q')
;
try
{
cli.disconnect()->wait();
}
catch (const mqtt::exception& exc)
{
std::cerr << "[ERROR] " << exc.what() << std::endl;
return 1;
}
return 0;
}
当我尝试发布响应消息时出现问题,因为客户端似乎在无限期地等待。对此负责的是 wait
函数,该函数用于标记以跟踪已发布消息 (reference) 的状态。据我了解,这必须完成,尤其是在使用更高级别的 QoS 时,因此请确保一切顺利。
删除对 wait()
的调用后,它按预期工作。但是我不确定这样是否能保证消息的正确发布。
正确的做法是什么?
这里我来猜一猜,因为我真的不知道 C++ 中的 async 是如何工作的。
MQTT 客户端有一个消息处理线程,它处理所有传入和传出的 TCP 数据包,因为它们 arrive/depart 在套接字上。当新的 MQTT 消息到达时,它会调用消息处理程序回调 (message_arrived
),您可以在其中调用 publish
和 wait
来完成它。但是因为对 wait
的调用实际上阻塞了 message_arrived
消息处理线程无法继续。这意味着它无法处理 publish
完成所需的 3 条腿 QOS2 握手,因此它挂起。
我还猜测,如果您将发布更改为 QOS 0,它会完成,但也会因 QOS 1 而失败,因为这需要消息处理线程 send/receive 多条消息才能继续。
不等待发布完成可能是正确的解决方案。
是的,@hardillb 是对的:问题是您无法从回调中对库进行阻塞调用。而wait()
是一个阻塞调用,所以它死锁了回调线程。
有一个线程处理来自 MQTT 连接的传入数据包,该线程用于调用回调。当您在 QoS 1 发布上调用 wait()
时,它会阻止输入处理,因此无法处理 PUBACK 以完成等待。
如果您要使用回调,则需要“全力以赴”并使用额外的回调来指示 success/failure 发布完成。
老实说,我从来不是 callback-driven 异步 I/O 的忠实粉丝;这很混乱,并且给应用程序带来了沉重的线程同步负担。但 C++ 库的最初目标是使其类似于早期的 IBM Java 库。
我更喜欢 future/promise (async/await) 风格。我认为 if/when 有一个经过改进的 v2.0 库,它将实现该样式。
我尝试使用 paho 库实现一个异步 MQTT 客户端,它接收关于主题“请求”的消息,制定一个字符串并将响应放在主题“响应”上。我使用回调来处理传入的消息。
#include "mqtt/async_client.h"
#include "mqtt/topic.h"
const std::string SERVER_ADDRESS {"tcp://localhost:2883"};
const std::string CLIENT_ID {"test_client"};
class TestCallback : public virtual mqtt::callback
{
// the mqtt client
mqtt::async_client& cli_;
// (re)connection success
void connected(const std::string& cause) override
{
cli_.subscribe("request", 0);
}
// callback for when a message arrives.
void message_arrived(mqtt::const_message_ptr msg) override
{
if( msg->get_topic() == "request" )
{
/* format response message here and put it into (string) msg */
mqtt::message_ptr pubmsg = mqtt::make_message("response", msg);
pubmsg->set_qos(2);
//// PROBLEMATIC CODE ////
cli_.publish(pubmsg)->wait();
//////////////////////////
}
}
public:
TestCallback(mqtt::async_client& cli)
: cli_(cli) {}
};
int main(int argc, char** argv)
{
mqtt::async_client cli(SERVER_ADDRESS, CLIENT_ID);
TestCallback cb(cli);
cli.set_callback(cb);
mqtt::connect_options connOpts = mqtt::connect_options_builder()
.clean_session(false)
.automatic_reconnect()
.finalize();
try
{
cli.connect(connOpts)->wait();
}
catch (const mqtt::exception& exc)
{
std::cerr << "[ERROR] " << exc.what() << std::endl;
return 1;
}
// run until the application is shut down
while (std::tolower(std::cin.get()) != 'q')
;
try
{
cli.disconnect()->wait();
}
catch (const mqtt::exception& exc)
{
std::cerr << "[ERROR] " << exc.what() << std::endl;
return 1;
}
return 0;
}
当我尝试发布响应消息时出现问题,因为客户端似乎在无限期地等待。对此负责的是 wait
函数,该函数用于标记以跟踪已发布消息 (reference) 的状态。据我了解,这必须完成,尤其是在使用更高级别的 QoS 时,因此请确保一切顺利。
删除对 wait()
的调用后,它按预期工作。但是我不确定这样是否能保证消息的正确发布。
正确的做法是什么?
这里我来猜一猜,因为我真的不知道 C++ 中的 async 是如何工作的。
MQTT 客户端有一个消息处理线程,它处理所有传入和传出的 TCP 数据包,因为它们 arrive/depart 在套接字上。当新的 MQTT 消息到达时,它会调用消息处理程序回调 (message_arrived
),您可以在其中调用 publish
和 wait
来完成它。但是因为对 wait
的调用实际上阻塞了 message_arrived
消息处理线程无法继续。这意味着它无法处理 publish
完成所需的 3 条腿 QOS2 握手,因此它挂起。
我还猜测,如果您将发布更改为 QOS 0,它会完成,但也会因 QOS 1 而失败,因为这需要消息处理线程 send/receive 多条消息才能继续。
不等待发布完成可能是正确的解决方案。
是的,@hardillb 是对的:问题是您无法从回调中对库进行阻塞调用。而wait()
是一个阻塞调用,所以它死锁了回调线程。
有一个线程处理来自 MQTT 连接的传入数据包,该线程用于调用回调。当您在 QoS 1 发布上调用 wait()
时,它会阻止输入处理,因此无法处理 PUBACK 以完成等待。
如果您要使用回调,则需要“全力以赴”并使用额外的回调来指示 success/failure 发布完成。
老实说,我从来不是 callback-driven 异步 I/O 的忠实粉丝;这很混乱,并且给应用程序带来了沉重的线程同步负担。但 C++ 库的最初目标是使其类似于早期的 IBM Java 库。
我更喜欢 future/promise (async/await) 风格。我认为 if/when 有一个经过改进的 v2.0 库,它将实现该样式。