添加 correlationData 后,MQTT5 消息在 HiveMQ Cloud 上丢失

MQTT5 message is lost on HiveMQ Cloud when correlationData is added

在 Java 服务器应用程序中,我们想要使用 correlationData,它是 MQTT5 的一部分,因此我们可以在对 link 的回复消息中使用它,并在回复是收到。

我正在使用 hivemq-mqtt-client 库 1.2.2 并连接到 HiveMQ Cloud。

连接是这样的:

private Mqtt5AsyncClient client;

public MqttConfig(Environment environment) {
    client = MqttClient.builder()
            .useMqttVersion5()
            .identifier("TestServer")
            .serverHost(MQTT_SERVER)
            .serverPort(MQTT_PORT)
            .sslWithDefaultConfig()
            .automaticReconnectWithDefaultConfig()
            .buildAsync();

    client.connectWith()
            .simpleAuth()
            .username(MQTT_USER)
            .password(MQTT_PASSWORD.getBytes())
            .applySimpleAuth()
            .send()
            .whenComplete((connAck, throwable) -> {
                if (throwable != null) {
                    logger.error("Could not connect to MQTT: {}", throwable.getMessage());
                } else {
                    logger.info("Connected to MQTT: {}", connAck.getReasonCode());
                }
            });
}

public Mqtt5AsyncClient getClient() {
    return client;
}

发送消息是用这个方法完成的:

mqttConfig.getClient()
            .publishWith()
            .topic(destinationTopic)
            //.correlationData(correlationData.getBytes())
            .responseTopic(responseTopic)
            .payload(message.getBytes())
            .qos(MqttQos.AT_LEAST_ONCE)
            .send()
            .whenComplete((result, throwable) -> {
                if (throwable != null) {
                    logger.info("Error sending to '{}': {} - {}", destinationTopic, message, throwable.getMessage());
                } else {
                    logger.info("Message sent to '{}': {} - {}", destinationTopic, message, result);
                }
            });

http://www.hivemq.com/demos/websocket-client/ 和订阅者上监视消息时,只有在带有 `correlationData()' 的行被注释时才会收到消息,如上所示。

无论有无该行,应用程序都会记录一次成功发送,例如启用相关数据:

Message sent to 'server/test': testMessage - MqttQos2Result{publish=MqttPublish{topic=server/test, payload=11byte, qos=EXACTLY_ONCE, retain=false, responseTopic=server/test/reply, correlationData=6byte}, pubRec=MqttPubRec{packetIdentifier=1}}

知道为什么额外的 correlationData 似乎导致它们没有显示在 websocket 测试页面上,并且没有被任何订阅者接收到吗?

作为实验,我使用了 paho 5 库而不是 HiveMQ 库和以下代码,但具有完全相同的行为并且需要禁用该行以查看消息传递:

    MqttProperties properties = new MqttProperties();
    //properties.setCorrelationData(correlationData.getBytes());
    MqttMessage mqttMessage = new MqttMessage();
    mqttMessage.setQos(1);
    mqttMessage.setPayload(message.getBytes());
    mqttMessage.setProperties(properties);
    try {
        mqttConfig.getClient().publish(destinationTopic, mqttMessage);
        logger.info("Message was sent to '{}': {}", destinationTopic, message);
    } catch (MqttException ex) {
        logger.error("Error sending to '{}': {} - {}", destinationTopic, message, ex.getMessage());
    }

HiveMQ Cloud 现已修复此行为。