Libmosquitto publish 不会将所有消息传递到 Azure IoT 中心

Libmosquitto publish doesn't deliver all messages to Azure IoT Hub

我正在尝试每秒向 Azure Iot Hub 内置事件中心发布超过 100 条消息。我正在使用 libmosquitto 1.6.8 库。我正在使用 Azure Iot Hub 的免费套餐,我知道每秒 100 条消息的限制。但这与那个问题无关。我什至无法将一半的消息发布到 AZ Iot Hub。

基本上,我有一个 multimap 中需要发送的多个值的列表。指标列表:

std::multimap< const std::string, std::tuple< const std::string, const std::string, float> > calculatedMetricList;

我将遍历 multimap 并将每个值构建到一个对象负载中,然后将其发送出去。这意味着 mosquitto_publish 方法将被多次调用。

以下是发布消息的代码:

void MosquittoClient::sendDataToUpstreamSystem(){

StatisticalMethod statisticalMethod;
int rc;

MosquittoClient pub_mosq(
    "<IoT Hub Name>.azure-devices.net",
    "<deviceID>", 
    "<username>", 
    "<Password>", 
    "devices/<deviceID>/messages/events/");

printf("Using MQTT to get data payload from host: %s and on port: %d.\r\n", pub_mosq.get_host(), pub_mosq.get_port());
// init the mosquitto lib
mosquitto_lib_init();

// create the mosquito object
struct mosquitto* mosq = mosquitto_new(pub_mosq.get_deviceID(), false, NULL);

// add callback functions
mosquitto_connect_callback_set(mosq, MosquittoClient::connect_callback);
mosquitto_publish_callback_set(mosq, MosquittoClient::publish_callback);
mosquitto_message_callback_set(mosq, MosquittoClient::on_message);
mosquitto_disconnect_callback_set(mosq, MosquittoClient::on_disconnect_callback);

// set mosquitto username, password and options
mosquitto_username_pw_set(mosq, pub_mosq.get_userName(), pub_mosq.get_password());

// specify the certificate to use
std::ifstream infile(pub_mosq.get_certificate());
bool certExists = infile.good();
infile.close();

if (!certExists)
{
    printf("Warning: Could not find file '%s'! The mosquitto loop may fail.\r\n", pub_mosq.get_certificate());
}

printf("Using certificate: %s\r\n", pub_mosq.get_certificate());
mosquitto_tls_set(mosq, pub_mosq.get_certificate(), NULL, NULL, NULL, NULL);

// specify the mqtt version to use
int* option = new int(MQTT_PROTOCOL_V311);
rc = mosquitto_opts_set(mosq, MOSQ_OPT_PROTOCOL_VERSION, option);
if (rc != MOSQ_ERR_SUCCESS)
{
    rc = pub_mosq.mosquitto_error(rc, "Error: opts_set protocol version");
}
else
{
    printf("Setting up options OK\r\n");
}

// connect
printf("Connecting...\r\n");
rc = mosquitto_connect_async(mosq, pub_mosq.get_host(), pub_mosq.get_port(), 4);
if (rc != MOSQ_ERR_SUCCESS)
{
    rc = pub_mosq.mosquitto_error(rc, NULL);
}
else
{
    printf("Connect returned OK\r\n");

    rc = mosquitto_loop_start(mosq);

    if (rc != MOSQ_ERR_SUCCESS)
    {
        rc = pub_mosq.mosquitto_error(rc, NULL);
    }
    else
    {
        do
        {
            for (auto itr = Metrics::calculatedMetricList.begin(); itr != Metrics::calculatedMetricList.end(); itr++) { 
                int msgId = rand();

                std::string test1= itr->first;
                std::string test2 = std::get<0>(itr->second);
                std::string test3= std::get<1>(itr->second); // metric type 
                float value = std::get<2>(itr->second); // value

                DataPayload objectPayload(
                    75162345,
                    496523,
                    test3,
                    value,
                    "test",
                    test1,
                    "test",
                    "test",
                    123,
                    213,
                    23
                );

                objectPayload.setPayload();
                std::string dataPayload = objectPayload.getPayload();

                //DEBUG
                std::cout << "dataPayload: " << dataPayload << std::endl;
                //DEBUG
                std::cout << "dataPayload Size: " << dataPayload.size() << std::endl;

                // once connected, we can publish (send) a Telemetry message
                printf("Publishing to topic: %s\r\n", pub_mosq.get_topic());

                rc = pub_mosq.publishToTopic(mosq, &msgId, dataPayload.size(), (char *)dataPayload.c_str());

                if (rc == MOSQ_ERR_SUCCESS)
                {
                    printf("Publish returned OK\r\n");
                }
                else 
                {               
                    rc = pub_mosq.mosquitto_error(rc, NULL);
                }
            } 

        } while (rc != MOSQ_ERR_SUCCESS);   
    }
}

mosquitto_loop_stop(mosq, true);
mosquitto_destroy(mosq);

mosquitto_lib_cleanup();}

发布方式:

    int MosquittoClient::publishToTopic(struct mosquitto *mosq, int *msgId, int sizeOfData, char *data)
{
    return mosquitto_publish(mosq, msgId, p_topic, sizeOfData, data, 1, true);
}

当运行程序发布所有消息returnok,按控制台显示。但只有一两条消息出现在 Azure IoT 中心端。

下图是IoT Hub的监控,当时只有一条消息通过。 Device Explorer Twin Monitoring

我尝试了很多不同的解决方案,但程序无法发布所有消息。看起来发布方法正在等待完成第一条消息,但迭代正在移动到下一条消息,导致它被丢弃。如果这是导致消息丢失的原因,那么对消息发送进行排序的最佳方法是什么?否则,还有什么可能导致消息被丢弃?

更新

问题是程序没有等到消息成功发布到代理(Azure IoT 中心)。如果 publish_callback 是 returned,您将知道消息是否已成功发布到代理。

void MosquittoClient::publish_callback(struct mosquitto* mosq, void* userdata, int mid)
{
    printf("Publish OK.\r\n");
}

解决方案是在销毁之前让线程休眠,清理调用并在建立连接之前启动 Mosquitto 循环。

sleep(30);
mosquitto_loop_stop(mosq, true);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();

mosquitto_publish() 是异步的:拥有它 return MOSQ_ERR_SUCCESS 仅仅意味着消息的发布已经正确地传递给 Mosquitto 线程。因此,此时您正在排队大量消息,然后让您的程序在有机会实际发送数据包之前终止。

您可以使用 MosquittoClient::publish_callback 回调来检查所有消息是否已有效发送,然后停止循环并终止您的程序。