MQTT 客户端在消息发布期间无限期等待

MQTT client waits indefinitely during publish of message

我尝试使用 paho 库实现一个异步 MQTT 客户端,它接收关于主题“请求”的消息,制定一个字符串并将响应放在主题“响应”上。我使用回调来处理传入的消息。

#include "mqtt/async_client.h"  
#include "mqtt/topic.h"

const std::string SERVER_ADDRESS {"tcp://localhost:2883"};
const std::string CLIENT_ID {"test_client"};

class TestCallback : public virtual mqtt::callback
{
    // the mqtt client
    mqtt::async_client& cli_;

    // (re)connection success
    void connected(const std::string& cause) override
    {
        cli_.subscribe("request", 0);
    }

   // callback for when a message arrives.
    void message_arrived(mqtt::const_message_ptr msg) override 
    {
        if( msg->get_topic() == "request" )
        {   
            /* format response message here and put it into (string) msg */

            mqtt::message_ptr pubmsg = mqtt::make_message("response", msg);
            pubmsg->set_qos(2);

            //// PROBLEMATIC CODE ////
            cli_.publish(pubmsg)->wait();
            //////////////////////////
        }
    }

public:
    TestCallback(mqtt::async_client& cli)
        : cli_(cli) {}
};



int main(int argc, char** argv)
{    
    mqtt::async_client cli(SERVER_ADDRESS, CLIENT_ID);
    TestCallback cb(cli);
    cli.set_callback(cb);

    mqtt::connect_options connOpts = mqtt::connect_options_builder()
        .clean_session(false)
        .automatic_reconnect()
        .finalize();

    try
    {
        cli.connect(connOpts)->wait();
    }
    catch (const mqtt::exception& exc)
    {
        std::cerr << "[ERROR] " << exc.what() << std::endl;
        return 1;
    }

    // run until the application is shut down
    while (std::tolower(std::cin.get()) != 'q')
            ;

    try
    {
        cli.disconnect()->wait();
    }
    catch (const mqtt::exception& exc)
    {
        std::cerr << "[ERROR] " << exc.what() << std::endl;
        return 1;
    }

    return 0;
}

当我尝试发布响应消息时出现问题,因为客户端似乎在无限期地等待。对此负责的是 wait 函数,该函数用于标记以跟踪已发布消息 (reference) 的状态。据我了解,这必须完成,尤其是在使用更高级别的 QoS 时,因此请确保一切顺利。

删除对 wait() 的调用后,它按预期工作。但是我不确定这样是否能保证消息的正确发布。

正确的做法是什么?

这里我来猜一猜,因为我真的不知道 C++ 中的 async 是如何工作的。

MQTT 客户端有一个消息处理线程,它处理所有传入和传出的 TCP 数据包,因为它们 arrive/depart 在套接字上。当新的 MQTT 消息到达时,它会调用消息处理程序回调 (message_arrived),您可以在其中调用 publishwait 来完成它。但是因为对 wait 的调用实际上阻塞了 message_arrived 消息处理线程无法继续。这意味着它无法处理 publish 完成所需的 3 条腿 QOS2 握手,因此它挂起。

我还猜测,如果您将发布更改为 QOS 0,它会完成,但也会因 QOS 1 而失败,因为这需要消息处理线程 send/receive 多条消息才能继续。

不等待发布完成可能是正确的解决方案。

是的,@hardillb 是对的:问题是您无法从回调中对库进行阻塞调用。而wait()是一个阻塞调用,所以它死锁了回调线程。

有一个线程处理来自 MQTT 连接的传入数据包,该线程用于调用回调。当您在 QoS 1 发布上调用 wait() 时,它会阻止输入处理,因此无法处理 PUBACK 以完成等待。

如果您要使用回调,则需要“全力以赴”并使用额外的回调来指示 success/failure 发布完成。

老实说,我从来不是 callback-driven 异步 I/O 的忠实粉丝;这很混乱,并且给应用程序带来了沉重的线程同步负担。但 C++ 库的最初目标是使其类似于早期的 IBM Java 库。

我更喜欢 future/promise (async/await) 风格。我认为 if/when 有一个经过改进的 v2.0 库,它将实现该样式。