MQTT PAHO - 用于确认成功消息传递的 MessageId
MQTT PAHO - MessageId for confirmation of successful message delivery
我在 Java 中使用 org.eclipse.paho.client.mqttv3 版本 1.2.0 开发了一个应用程序。通过 iMqttDeliveryToken 的 messageID 来识别发送到 mqtt broker 的消息。
第 1 步 - 发布消息:
ObjectMapper objectMapper = new ObjectMapper();
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(objectMapper.writeValueAsString(myObject).getBytes());
mqttMessage.setQos(1);
IMqttDeliveryToken iMqttDeliveryToken = this.client.publish("/myTopic", mqttMessage);
步骤 2 - 在数据库中保存消息:
从 IMqttDeliveryToken 中我得到了 messageID。我用它来保存和识别数据库中的消息。
第 3 步 - 等待调用 deliveryComplete 回调:
这为我提供了相同的 IMqttDeliveryToken,我可以在其中再次获取 messageId。
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
// delete the database entry via messageId from database
}
问题是第 3 步可能比第 2 步快。所以回调在我的条目保存在数据库中之前被调用。我需要在发送消息之前知道 messageId 以在调用回调之前保存它。我无法自己生成 messageId 并像这样设置它:
mqttMessage.setId(555);
MQTT 生成自己的 messageId。所以我的问题:
- 是否可以设置自己的messageId?
- 我可以在发布前获取mqtt客户端生成的messageId吗?
不要使用 Paho 库生成的消息的 MQTT id - 因为它
- 无法满足您的需求
- 如果您发送大量消息,可能会重复发送。
相反,使用您自己的 ID(甚至可能由您的数据库自动生成)并将其作为用户定义的 上下文对象 传递,当 publishing:
Long databaseId = 42;
ObjectMapper objectMapper = new ObjectMapper();
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(objectMapper.writeValueAsString(myObject).getBytes());
mqttMessage.setQos(1);
this.client.publish("/myTopic", mqttMessage, databaseId, mPublishCallback);
稍后您可以在发布回调方法中检索 id:
private final IMqttActionListener mPublishCallback = new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken publishToken) {
Long databaseId = (Long) publishToken.getUserContext();
}
@Override
public void onFailure(IMqttToken publishToken, Throwable ex) {
Long databaseId = (Long) publishToken.getUserContext();
}
};
另外,您使用的是同步客户端吗?我更喜欢使用 IMqttAsyncClient
我在 Java 中使用 org.eclipse.paho.client.mqttv3 版本 1.2.0 开发了一个应用程序。通过 iMqttDeliveryToken 的 messageID 来识别发送到 mqtt broker 的消息。
第 1 步 - 发布消息:
ObjectMapper objectMapper = new ObjectMapper();
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(objectMapper.writeValueAsString(myObject).getBytes());
mqttMessage.setQos(1);
IMqttDeliveryToken iMqttDeliveryToken = this.client.publish("/myTopic", mqttMessage);
步骤 2 - 在数据库中保存消息:
从 IMqttDeliveryToken 中我得到了 messageID。我用它来保存和识别数据库中的消息。
第 3 步 - 等待调用 deliveryComplete 回调:
这为我提供了相同的 IMqttDeliveryToken,我可以在其中再次获取 messageId。
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
// delete the database entry via messageId from database
}
问题是第 3 步可能比第 2 步快。所以回调在我的条目保存在数据库中之前被调用。我需要在发送消息之前知道 messageId 以在调用回调之前保存它。我无法自己生成 messageId 并像这样设置它:
mqttMessage.setId(555);
MQTT 生成自己的 messageId。所以我的问题:
- 是否可以设置自己的messageId?
- 我可以在发布前获取mqtt客户端生成的messageId吗?
不要使用 Paho 库生成的消息的 MQTT id - 因为它
- 无法满足您的需求
- 如果您发送大量消息,可能会重复发送。
相反,使用您自己的 ID(甚至可能由您的数据库自动生成)并将其作为用户定义的 上下文对象 传递,当 publishing:
Long databaseId = 42;
ObjectMapper objectMapper = new ObjectMapper();
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(objectMapper.writeValueAsString(myObject).getBytes());
mqttMessage.setQos(1);
this.client.publish("/myTopic", mqttMessage, databaseId, mPublishCallback);
稍后您可以在发布回调方法中检索 id:
private final IMqttActionListener mPublishCallback = new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken publishToken) {
Long databaseId = (Long) publishToken.getUserContext();
}
@Override
public void onFailure(IMqttToken publishToken, Throwable ex) {
Long databaseId = (Long) publishToken.getUserContext();
}
};
另外,您使用的是同步客户端吗?我更喜欢使用 IMqttAsyncClient