Eclipse Paho MQTT 客户端:如何检查现有连接?

Eclipse Paho MQTT Client: How to check for existing connection?

在 Eclipse Paho MQTT 网站上,开发人员提供了一个执行以下操作的客户端示例 (http://www.eclipse.org/paho/files/mqttdoc/MQTTClient/html/pubsync.html):

  1. 使用指定参数创建客户端对象
  2. 使用指定的连接选项连接客户端
  3. 发布 MQTT 消息
  4. 断开客户端
  5. 销毁客户端对象

如果您只想发布一条消息,这很有效。

在我的代码中,我有一个函数包含与上述示例中几乎相同的代码,但是,该函数从 main() 中重复调用,因为我需要一个接一个地发布大量消息.问题是,如果我完全按照示例中的方式使用代码,则每次调用我的函数时都会创建一个新连接,并在不久后销毁。只要反复调用该函数,这种情况就会一次又一次地发生,造成巨大的开销。

有没有办法检查是否已经创建了客户端对象,如果已经创建,则不要再次创建而是使用现有的?

根据我的理解,MQTTClient_isConnected() 函数应该这样做:https://www.eclipse.org/paho/files/mqttdoc/MQTTClient/html/_m_q_t_t_client_8h.html#ad9e40bdb7149ee3e5d075db7f51a735f 但是如果我这样尝试,我会得到一个分段错误:

if (!MQTTClient_isConnected(client)) {
    MQTTClient_create(&client, mqtt.addr, CLIENT_ID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
    conn_opts.keepAliveInterval = 20;
    conn_opts.cleansession = 1;
    conn_opts.username = TOKEN;

    if (MQTTClient_connect(client, &conn_opts) != MQTTCLIENT_SUCCESS) {
        printf("\n==> Connection to MQTT Broker failed.\n");
        MQTTClient_destroy(&client);
        exit(EXIT_FAILURE);
    }
}

[编辑]

这是一个简单的演示代码,可以更好地说明我要完成的工作:

#include <stdio.h>
#include <MQTTClient.h>

MQTTClient client;

void publish_MQTT() {
    MQTTClient_connectOptions conn_opts =  MQTTClient_connectOptions_initializer;
    MQTTClient_message pubmsg = MQTTClient_message_initializer;
    MQTTClient_deliveryToken token;
    char *payload = (char *)calloc(1024, sizeof(char));

    strcpy(payload, "hello");

    printf("DEBUG_BEFORE >> MQTTClient_isConnected(client) = %d\n", MQTTClient_isConnected(client)); // DEBUG OUTPUT

    if (!MQTTClient_isConnected(client)) {
        MQTTClient_create(&client, addr, CLIENT_ID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
        conn_opts.keepAliveInterval = 20;
        conn_opts.cleansession = 1;
        conn_opts.username = TOKEN;

        if (MQTTClient_connect(client, &conn_opts) != MQTTCLIENT_SUCCESS) {
            fprintf(stderr, RED "\n==> Connection to MQTT Broker failed.\n" RESET_CL);
            MQTTClient_destroy(&client);
            free(payload);
            exit(EXIT_FAILURE);
        }
    }

    printf("DEBUG_AFTER >> MQTTClient_isConnected(client) = %d\n", MQTTClient_isConnected(client)); // DEBUG OUTPUT

    pubmsg.payload = payload;
    pubmsg.payloadlen = strlen(payload);
    pubmsg.qos = QOS;
    pubmsg.retained = 0;

    MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);
    MQTTClient_waitForCompletion(client, token, TIMEOUT);

    //MQTTClient_disconnect(client, 10000);
    //MQTTClient_destroy(&client);
    free(payload);
}

int main(void) {
    for (i=0; i<1000; i++) {
        publish_MQTT();
    }

    return 0;
}

请忽略 addr 参数从未被指定(在我的真实代码中是)或者在 publish_MQTT() 函数中指定消息是非常无用的事实(在我的真实代码中,数据从 main() 传递到该函数)。

您可能正在设置 "clean session flag",什么意思:“ 如果 ClientId 表示已经连接到服务器的客户端,则服务器必须断开现有客户端 [MQTT-3.1.4-2]。”(来自 mqtt 标准)。因此您的客户端已断开连接(现有客户端)。

示例中的代码似乎是合理的。看起来传递函数参数有问题。例如,如果函数需要地址,而您正在给对象本身。

Morze 从标准: "3.2.2.2 会话存在 位置:连接确认标志的位 0。

如果服务器接受 CleanSession 设置为 1 的连接,除了在 CONNACK 数据包中设置零 return 代码外,服务器还必须在 CONNACK 数据包中将 Session Present 设置为 0 [MQTT-3.2. 2-1].

如果服务器接受 CleanSession 设置为 0 的连接,则 Session Present 中设置的值取决于服务器是否已经为提供的客户端 ID 存储会话状态。如果服务器存储了会话状态,它必须在 CONNACK 数据包 [MQTT-3.2.2-2] 中将 Session Present 设置为 1。如果服务器没有存储会话状态,它必须在 CONNACK 数据包中将会话存在设置为 0。这是在 CONNACK 数据包中设置零 return 代码的补充。

我想通了:显然,原始帖子中的示例代码绝对没有问题。

事实证明,我一次又一次地将 MQTT 服务器的端口附加到 addr 参数(在此处未显示的代码部分,因为我不怀疑错误的来源在那里),每次调用 publish_MQTT() 函数时。这使得 addr char 字符串增长并最终超过指定的长度,从而导致 SegFault。

这样一切都按预期工作:

printf("\nADDR = %s\n\n", addr); // DEBUG OUTPUT

if (!MQTTClient_isConnected(client)) {
    strcat(strcat(addr, ":"), pt); // This line needed to be placed here, not before that if block

    MQTTClient_create(&client, addr, CLIENT_ID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
    conn_opts.keepAliveInterval = 20;
    conn_opts.cleansession = 1;
    conn_opts.username = TOKEN;

    if (MQTTClient_connect(client, &conn_opts) != MQTTCLIENT_SUCCESS) {
        printf("\n==> Connection to MQTT Broker failed.\n");
        MQTTClient_destroy(&client);
        free(payload);
        exit(EXIT_FAILURE);
    }
}