Eclipse Paho MQTT 客户端:如何检查现有连接?
Eclipse Paho MQTT Client: How to check for existing connection?
在 Eclipse Paho MQTT 网站上,开发人员提供了一个执行以下操作的客户端示例 (http://www.eclipse.org/paho/files/mqttdoc/MQTTClient/html/pubsync.html):
- 使用指定参数创建客户端对象
- 使用指定的连接选项连接客户端
- 发布 MQTT 消息
- 断开客户端
- 销毁客户端对象
如果您只想发布一条消息,这很有效。
在我的代码中,我有一个函数包含与上述示例中几乎相同的代码,但是,该函数从 main() 中重复调用,因为我需要一个接一个地发布大量消息.问题是,如果我完全按照示例中的方式使用代码,则每次调用我的函数时都会创建一个新连接,并在不久后销毁。只要反复调用该函数,这种情况就会一次又一次地发生,造成巨大的开销。
有没有办法检查是否已经创建了客户端对象,如果已经创建,则不要再次创建而是使用现有的?
根据我的理解,MQTTClient_isConnected() 函数应该这样做:https://www.eclipse.org/paho/files/mqttdoc/MQTTClient/html/_m_q_t_t_client_8h.html#ad9e40bdb7149ee3e5d075db7f51a735f
但是如果我这样尝试,我会得到一个分段错误:
if (!MQTTClient_isConnected(client)) {
MQTTClient_create(&client, mqtt.addr, CLIENT_ID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.username = TOKEN;
if (MQTTClient_connect(client, &conn_opts) != MQTTCLIENT_SUCCESS) {
printf("\n==> Connection to MQTT Broker failed.\n");
MQTTClient_destroy(&client);
exit(EXIT_FAILURE);
}
}
[编辑]
这是一个简单的演示代码,可以更好地说明我要完成的工作:
#include <stdio.h>
#include <MQTTClient.h>
MQTTClient client;
void publish_MQTT() {
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
MQTTClient_message pubmsg = MQTTClient_message_initializer;
MQTTClient_deliveryToken token;
char *payload = (char *)calloc(1024, sizeof(char));
strcpy(payload, "hello");
printf("DEBUG_BEFORE >> MQTTClient_isConnected(client) = %d\n", MQTTClient_isConnected(client)); // DEBUG OUTPUT
if (!MQTTClient_isConnected(client)) {
MQTTClient_create(&client, addr, CLIENT_ID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.username = TOKEN;
if (MQTTClient_connect(client, &conn_opts) != MQTTCLIENT_SUCCESS) {
fprintf(stderr, RED "\n==> Connection to MQTT Broker failed.\n" RESET_CL);
MQTTClient_destroy(&client);
free(payload);
exit(EXIT_FAILURE);
}
}
printf("DEBUG_AFTER >> MQTTClient_isConnected(client) = %d\n", MQTTClient_isConnected(client)); // DEBUG OUTPUT
pubmsg.payload = payload;
pubmsg.payloadlen = strlen(payload);
pubmsg.qos = QOS;
pubmsg.retained = 0;
MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);
MQTTClient_waitForCompletion(client, token, TIMEOUT);
//MQTTClient_disconnect(client, 10000);
//MQTTClient_destroy(&client);
free(payload);
}
int main(void) {
for (i=0; i<1000; i++) {
publish_MQTT();
}
return 0;
}
请忽略 addr 参数从未被指定(在我的真实代码中是)或者在 publish_MQTT() 函数中指定消息是非常无用的事实(在我的真实代码中,数据从 main() 传递到该函数)。
您可能正在设置 "clean session flag",什么意思:“
如果 ClientId 表示已经连接到服务器的客户端,则服务器必须断开现有客户端 [MQTT-3.1.4-2]。”(来自 mqtt 标准)。因此您的客户端已断开连接(现有客户端)。
示例中的代码似乎是合理的。看起来传递函数参数有问题。例如,如果函数需要地址,而您正在给对象本身。
Morze 从标准:
"3.2.2.2 会话存在
位置:连接确认标志的位 0。
如果服务器接受 CleanSession 设置为 1 的连接,除了在 CONNACK 数据包中设置零 return 代码外,服务器还必须在 CONNACK 数据包中将 Session Present 设置为 0 [MQTT-3.2. 2-1].
如果服务器接受 CleanSession 设置为 0 的连接,则 Session Present 中设置的值取决于服务器是否已经为提供的客户端 ID 存储会话状态。如果服务器存储了会话状态,它必须在 CONNACK 数据包 [MQTT-3.2.2-2] 中将 Session Present 设置为 1。如果服务器没有存储会话状态,它必须在 CONNACK 数据包中将会话存在设置为 0。这是在 CONNACK 数据包中设置零 return 代码的补充。
我想通了:显然,原始帖子中的示例代码绝对没有问题。
事实证明,我一次又一次地将 MQTT 服务器的端口附加到 addr 参数(在此处未显示的代码部分,因为我不怀疑错误的来源在那里),每次调用 publish_MQTT() 函数时。这使得 addr char 字符串增长并最终超过指定的长度,从而导致 SegFault。
这样一切都按预期工作:
printf("\nADDR = %s\n\n", addr); // DEBUG OUTPUT
if (!MQTTClient_isConnected(client)) {
strcat(strcat(addr, ":"), pt); // This line needed to be placed here, not before that if block
MQTTClient_create(&client, addr, CLIENT_ID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.username = TOKEN;
if (MQTTClient_connect(client, &conn_opts) != MQTTCLIENT_SUCCESS) {
printf("\n==> Connection to MQTT Broker failed.\n");
MQTTClient_destroy(&client);
free(payload);
exit(EXIT_FAILURE);
}
}
在 Eclipse Paho MQTT 网站上,开发人员提供了一个执行以下操作的客户端示例 (http://www.eclipse.org/paho/files/mqttdoc/MQTTClient/html/pubsync.html):
- 使用指定参数创建客户端对象
- 使用指定的连接选项连接客户端
- 发布 MQTT 消息
- 断开客户端
- 销毁客户端对象
如果您只想发布一条消息,这很有效。
在我的代码中,我有一个函数包含与上述示例中几乎相同的代码,但是,该函数从 main() 中重复调用,因为我需要一个接一个地发布大量消息.问题是,如果我完全按照示例中的方式使用代码,则每次调用我的函数时都会创建一个新连接,并在不久后销毁。只要反复调用该函数,这种情况就会一次又一次地发生,造成巨大的开销。
有没有办法检查是否已经创建了客户端对象,如果已经创建,则不要再次创建而是使用现有的?
根据我的理解,MQTTClient_isConnected() 函数应该这样做:https://www.eclipse.org/paho/files/mqttdoc/MQTTClient/html/_m_q_t_t_client_8h.html#ad9e40bdb7149ee3e5d075db7f51a735f 但是如果我这样尝试,我会得到一个分段错误:
if (!MQTTClient_isConnected(client)) {
MQTTClient_create(&client, mqtt.addr, CLIENT_ID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.username = TOKEN;
if (MQTTClient_connect(client, &conn_opts) != MQTTCLIENT_SUCCESS) {
printf("\n==> Connection to MQTT Broker failed.\n");
MQTTClient_destroy(&client);
exit(EXIT_FAILURE);
}
}
[编辑]
这是一个简单的演示代码,可以更好地说明我要完成的工作:
#include <stdio.h>
#include <MQTTClient.h>
MQTTClient client;
void publish_MQTT() {
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
MQTTClient_message pubmsg = MQTTClient_message_initializer;
MQTTClient_deliveryToken token;
char *payload = (char *)calloc(1024, sizeof(char));
strcpy(payload, "hello");
printf("DEBUG_BEFORE >> MQTTClient_isConnected(client) = %d\n", MQTTClient_isConnected(client)); // DEBUG OUTPUT
if (!MQTTClient_isConnected(client)) {
MQTTClient_create(&client, addr, CLIENT_ID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.username = TOKEN;
if (MQTTClient_connect(client, &conn_opts) != MQTTCLIENT_SUCCESS) {
fprintf(stderr, RED "\n==> Connection to MQTT Broker failed.\n" RESET_CL);
MQTTClient_destroy(&client);
free(payload);
exit(EXIT_FAILURE);
}
}
printf("DEBUG_AFTER >> MQTTClient_isConnected(client) = %d\n", MQTTClient_isConnected(client)); // DEBUG OUTPUT
pubmsg.payload = payload;
pubmsg.payloadlen = strlen(payload);
pubmsg.qos = QOS;
pubmsg.retained = 0;
MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);
MQTTClient_waitForCompletion(client, token, TIMEOUT);
//MQTTClient_disconnect(client, 10000);
//MQTTClient_destroy(&client);
free(payload);
}
int main(void) {
for (i=0; i<1000; i++) {
publish_MQTT();
}
return 0;
}
请忽略 addr 参数从未被指定(在我的真实代码中是)或者在 publish_MQTT() 函数中指定消息是非常无用的事实(在我的真实代码中,数据从 main() 传递到该函数)。
您可能正在设置 "clean session flag",什么意思:“ 如果 ClientId 表示已经连接到服务器的客户端,则服务器必须断开现有客户端 [MQTT-3.1.4-2]。”(来自 mqtt 标准)。因此您的客户端已断开连接(现有客户端)。
示例中的代码似乎是合理的。看起来传递函数参数有问题。例如,如果函数需要地址,而您正在给对象本身。
Morze 从标准: "3.2.2.2 会话存在 位置:连接确认标志的位 0。
如果服务器接受 CleanSession 设置为 1 的连接,除了在 CONNACK 数据包中设置零 return 代码外,服务器还必须在 CONNACK 数据包中将 Session Present 设置为 0 [MQTT-3.2. 2-1].
如果服务器接受 CleanSession 设置为 0 的连接,则 Session Present 中设置的值取决于服务器是否已经为提供的客户端 ID 存储会话状态。如果服务器存储了会话状态,它必须在 CONNACK 数据包 [MQTT-3.2.2-2] 中将 Session Present 设置为 1。如果服务器没有存储会话状态,它必须在 CONNACK 数据包中将会话存在设置为 0。这是在 CONNACK 数据包中设置零 return 代码的补充。
我想通了:显然,原始帖子中的示例代码绝对没有问题。
事实证明,我一次又一次地将 MQTT 服务器的端口附加到 addr 参数(在此处未显示的代码部分,因为我不怀疑错误的来源在那里),每次调用 publish_MQTT() 函数时。这使得 addr char 字符串增长并最终超过指定的长度,从而导致 SegFault。
这样一切都按预期工作:
printf("\nADDR = %s\n\n", addr); // DEBUG OUTPUT
if (!MQTTClient_isConnected(client)) {
strcat(strcat(addr, ":"), pt); // This line needed to be placed here, not before that if block
MQTTClient_create(&client, addr, CLIENT_ID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.username = TOKEN;
if (MQTTClient_connect(client, &conn_opts) != MQTTCLIENT_SUCCESS) {
printf("\n==> Connection to MQTT Broker failed.\n");
MQTTClient_destroy(&client);
free(payload);
exit(EXIT_FAILURE);
}
}