我如何知道 ACK 对应于 MQTT 上的哪个发布消息?
How can I tell an ACK corresponds to which publish message on MQTT?
我正在为 Mqtt paho 驱动程序苦苦挣扎...
我正在使用 IMqttDeliveryToken 在收到我的发布时从服务器获取确认。
为了将它与实际发布的消息进行比较,我在 MqttMessage 上设置了一个 ID,以便从 IMqttDeliveryToken 中检索它...但是它不起作用...IMqttDeliveryToken.getMessageId() returns 一个不正确的 ID,当我在 QoS 不同于 0 时尝试在 IMqttDeliveryToken.getMessage() 之后获取 ID,它 returns 一个 NPE。
阅读 Javadoc 后,我了解到这是通常的行为:
Until the message has been delivered, the message being delivered will be returned. Once the message has been delivered null will be returned.
这让我想到另一个问题... deliveryComplete() 方法真的是在 Broker 发送确认后调用的吗?
这是我的代码:
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable thrwbl) { }
@Override
public void messageArrived(String string, MqttMessage mm) throws Exception { }
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
try {
System.out.println("Message ID from getMessageId() method : " + token.getMessageId());
MqttMessage message = token.getMessage();
System.out.println("Message ID from getMessage() method : " + message.getId());
} catch (MqttException ex) {
System.out.println(ex);
} catch (Exception ex) {
System.out.println(ex);
}
}
});
MqttMessage message = new MqttMessage();
message.setId(76);
message.setPayload("pouet".getBytes());
message.setQos(0);
client.publish("TEST", message);
QoS 为 0 时:
Message ID from getMessageId() method : 1
Message ID from getMessage() method : 76
QoS 为 1 时:
Message ID from getMessageId() method : 1
java.lang.NullPointerException
如gitMqttMessage.java
所述
/**
* This is only to be used internally to provide the MQTT id of a message
* received from the server. Has no effect when publishing messages.
* @param messageId
*/
public void setId(int messageId) {
this.messageId = messageId;
}
这在发布消息时没有用到。现在要了解为什么来自 getMessageId() 方法的消息 ID:1 发生了,请查看以下内容。
public IMqttDeliveryToken publish(String topic, MqttMessage message, Object userContext, IMqttActionListener callback) throws MqttException,
MqttPersistenceException {
final String methodName = "publish";
//@TRACE 111=< topic={0} message={1}userContext={1} callback={2}
log.fine(CLASS_NAME,methodName,"111", new Object[] {topic, userContext, callback});
//Checks if a topic is valid when publishing a message.
MqttTopic.validate(topic, false/*wildcards NOT allowed*/);
MqttDeliveryToken token = new MqttDeliveryToken(getClientId());
token.setActionCallback(callback);
token.setUserContext(userContext);
token.setMessage(message);
token.internalTok.setTopics(new String[] {topic});
MqttPublish pubMsg = new MqttPublish(topic, message);
comms.sendNoWait(pubMsg, token);
//@TRACE 112=<
log.fine(CLASS_NAME,methodName,"112");
return token;
}
MqttDeliveryToken 不在此处设置消息 id。发布时会创建 MqttPublish 实例,该实例在内部将多级扩展到 MqttWireMessage.java,默认情况下该值设置为 0。
public MqttWireMessage(byte type) {
this.type = type;
// Use zero as the default message ID. Can't use -1, as that is serialized
// as 65535, which would be a valid ID.
this.msgId = 0;
}
当在 ClientState.java 中为 mqtt 发布调用最终发送时,由于 if 条件为真且 getNextMessageId( ) 被称为 which returns 1 (因为它是第一条消息,否则它会根据最后一条消息 ID 返回后续值)并设置为令牌,您在 deliveryComplete().
public void send(MqttWireMessage message, MqttToken token) throws MqttException {
final String methodName = "send";
if (message.isMessageIdRequired() && (message.getMessageId() == 0)) {
message.setMessageId(getNextMessageId());
}
if (token != null ) {
try {
token.internalTok.setMessageID(message.getMessageId());
} catch (Exception e) {
}
}
/////......
}
回答您的下一个问题:deliveryComplete() 方法是否真的在 Broker 发送确认后被调用 --- 是!!
从这一小段代码调用完成的交付。
private void handleActionComplete(MqttToken token)
throws MqttException {
final String methodName = "handleActionComplete";
synchronized (token) {
// @TRACE 705=callback and notify for key={0}
log.fine(CLASS_NAME, methodName, "705", new Object[] { token.internalTok.getKey() });
if (token.isComplete()) {
// Finish by doing any post processing such as delete
// from persistent store but only do so if the action
// is complete
clientState.notifyComplete(token);
}
// Unblock any waiters and if pending complete now set completed
token.internalTok.notifyComplete();
if (!token.internalTok.isNotified()) {
// If a callback is registered and delivery has finished
// call delivery complete callback.
if ( mqttCallback != null
&& token instanceof MqttDeliveryToken
&& token.isComplete()) {
mqttCallback.deliveryComplete((MqttDeliveryToken) token);
}
// Now call async action completion callbacks
fireActionEvent(token);
}
// Set notified so we don't tell the user again about this action.
if ( token.isComplete() ){
if ( token instanceof MqttDeliveryToken || token.getActionCallback() instanceof IMqttActionListener ) {
token.internalTok.setNotified(true);
}
}
}
}
即一旦收到 ack,即完成 notifyComplete,设置标志,调用 deliveryComplete 方法。
我正在为 Mqtt paho 驱动程序苦苦挣扎...
我正在使用 IMqttDeliveryToken 在收到我的发布时从服务器获取确认。
为了将它与实际发布的消息进行比较,我在 MqttMessage 上设置了一个 ID,以便从 IMqttDeliveryToken 中检索它...但是它不起作用...IMqttDeliveryToken.getMessageId() returns 一个不正确的 ID,当我在 QoS 不同于 0 时尝试在 IMqttDeliveryToken.getMessage() 之后获取 ID,它 returns 一个 NPE。
阅读 Javadoc 后,我了解到这是通常的行为:
Until the message has been delivered, the message being delivered will be returned. Once the message has been delivered null will be returned.
这让我想到另一个问题... deliveryComplete() 方法真的是在 Broker 发送确认后调用的吗?
这是我的代码:
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable thrwbl) { }
@Override
public void messageArrived(String string, MqttMessage mm) throws Exception { }
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
try {
System.out.println("Message ID from getMessageId() method : " + token.getMessageId());
MqttMessage message = token.getMessage();
System.out.println("Message ID from getMessage() method : " + message.getId());
} catch (MqttException ex) {
System.out.println(ex);
} catch (Exception ex) {
System.out.println(ex);
}
}
});
MqttMessage message = new MqttMessage();
message.setId(76);
message.setPayload("pouet".getBytes());
message.setQos(0);
client.publish("TEST", message);
QoS 为 0 时:
Message ID from getMessageId() method : 1
Message ID from getMessage() method : 76
QoS 为 1 时:
Message ID from getMessageId() method : 1
java.lang.NullPointerException
如gitMqttMessage.java
所述/**
* This is only to be used internally to provide the MQTT id of a message
* received from the server. Has no effect when publishing messages.
* @param messageId
*/
public void setId(int messageId) {
this.messageId = messageId;
}
这在发布消息时没有用到。现在要了解为什么来自 getMessageId() 方法的消息 ID:1 发生了,请查看以下内容。
public IMqttDeliveryToken publish(String topic, MqttMessage message, Object userContext, IMqttActionListener callback) throws MqttException,
MqttPersistenceException {
final String methodName = "publish";
//@TRACE 111=< topic={0} message={1}userContext={1} callback={2}
log.fine(CLASS_NAME,methodName,"111", new Object[] {topic, userContext, callback});
//Checks if a topic is valid when publishing a message.
MqttTopic.validate(topic, false/*wildcards NOT allowed*/);
MqttDeliveryToken token = new MqttDeliveryToken(getClientId());
token.setActionCallback(callback);
token.setUserContext(userContext);
token.setMessage(message);
token.internalTok.setTopics(new String[] {topic});
MqttPublish pubMsg = new MqttPublish(topic, message);
comms.sendNoWait(pubMsg, token);
//@TRACE 112=<
log.fine(CLASS_NAME,methodName,"112");
return token;
}
MqttDeliveryToken 不在此处设置消息 id。发布时会创建 MqttPublish 实例,该实例在内部将多级扩展到 MqttWireMessage.java,默认情况下该值设置为 0。
public MqttWireMessage(byte type) {
this.type = type;
// Use zero as the default message ID. Can't use -1, as that is serialized
// as 65535, which would be a valid ID.
this.msgId = 0;
}
当在 ClientState.java 中为 mqtt 发布调用最终发送时,由于 if 条件为真且 getNextMessageId( ) 被称为 which returns 1 (因为它是第一条消息,否则它会根据最后一条消息 ID 返回后续值)并设置为令牌,您在 deliveryComplete().
public void send(MqttWireMessage message, MqttToken token) throws MqttException {
final String methodName = "send";
if (message.isMessageIdRequired() && (message.getMessageId() == 0)) {
message.setMessageId(getNextMessageId());
}
if (token != null ) {
try {
token.internalTok.setMessageID(message.getMessageId());
} catch (Exception e) {
}
}
/////......
}
回答您的下一个问题:deliveryComplete() 方法是否真的在 Broker 发送确认后被调用 --- 是!!
从这一小段代码调用完成的交付。
private void handleActionComplete(MqttToken token)
throws MqttException {
final String methodName = "handleActionComplete";
synchronized (token) {
// @TRACE 705=callback and notify for key={0}
log.fine(CLASS_NAME, methodName, "705", new Object[] { token.internalTok.getKey() });
if (token.isComplete()) {
// Finish by doing any post processing such as delete
// from persistent store but only do so if the action
// is complete
clientState.notifyComplete(token);
}
// Unblock any waiters and if pending complete now set completed
token.internalTok.notifyComplete();
if (!token.internalTok.isNotified()) {
// If a callback is registered and delivery has finished
// call delivery complete callback.
if ( mqttCallback != null
&& token instanceof MqttDeliveryToken
&& token.isComplete()) {
mqttCallback.deliveryComplete((MqttDeliveryToken) token);
}
// Now call async action completion callbacks
fireActionEvent(token);
}
// Set notified so we don't tell the user again about this action.
if ( token.isComplete() ){
if ( token instanceof MqttDeliveryToken || token.getActionCallback() instanceof IMqttActionListener ) {
token.internalTok.setNotified(true);
}
}
}
}
即一旦收到 ack,即完成 notifyComplete,设置标志,调用 deliveryComplete 方法。