MQTT PAHO - 用于确认成功消息传递的 MessageId

MQTT PAHO - MessageId for confirmation of successful message delivery

我在 Java 中使用 org.eclipse.paho.client.mqttv3 版本 1.2.0 开发了一个应用程序。通过 iMqttDeliveryToken 的 messageID 来识别发送到 mqtt broker 的消息。

第 1 步 - 发布消息:

ObjectMapper objectMapper = new ObjectMapper();
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(objectMapper.writeValueAsString(myObject).getBytes()); 
mqttMessage.setQos(1);
IMqttDeliveryToken iMqttDeliveryToken = this.client.publish("/myTopic", mqttMessage);

步骤 2 - 在数据库中保存消息:

从 IMqttDeliveryToken 中我得到了 messageID。我用它来保存和识别数据库中的消息。

第 3 步 - 等待调用 deliveryComplete 回调:

这为我提供了相同的 IMqttDeliveryToken,我可以在其中再次获取 messageId。

@Override
   public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
      // delete the database entry via messageId from database
}

问题是第 3 步可能比第 2 步快。所以回调在我的条目保存在数据库中之前被调用。我需要在发送消息之前知道 messageId 以在调用回调之前保存它。我无法自己生成 messageId 并像这样设置它:

mqttMessage.setId(555);

MQTT 生成自己的 messageId。所以我的问题:

  1. 是否可以设置自己的messageId?
  2. 我可以在发布前获取mqtt客户端生成的messageId吗?

不要使用 Paho 库生成的消息的 MQTT id - 因为它

  1. 无法满足您的需求
  2. 如果您发送大量消息,可能会重复发送。

相反,使用您自己的 ID(甚至可能由您的数据库自动生成)并将其作为用户定义的 上下文对象 传递,当 publishing:

Long databaseId = 42;
ObjectMapper objectMapper = new ObjectMapper();
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(objectMapper.writeValueAsString(myObject).getBytes()); 
mqttMessage.setQos(1);
this.client.publish("/myTopic", mqttMessage, databaseId, mPublishCallback);

稍后您可以在发布回调方法中检索 id:

private final IMqttActionListener mPublishCallback = new IMqttActionListener() {
    @Override
    public void onSuccess(IMqttToken publishToken) {
        Long databaseId = (Long) publishToken.getUserContext();
    }

    @Override
    public void onFailure(IMqttToken publishToken, Throwable ex) {
        Long databaseId = (Long) publishToken.getUserContext();
    }
};

另外,您使用的是同步客户端吗?我更喜欢使用 IMqttAsyncClient