如何在 HiveMQ Client 中接收多条消息? (MQTT)
How to receive multiple messages in HiveMQ Client? (MQTT)
我正在尝试弄清楚如何使用相同的 try catch 块在 HiveMQ 客户端中接收多条消息,即使使用不同的客户端也是如此。我按照这个例子:
上面的示例适用于一个客户端和一个发布和订阅,但如果可能的话,我想在 try catch 的同一块中执行多个这些操作。
package com.main;
import java.util.UUID;
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient.Mqtt5Publishes;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import java.util.logging.Logger;
import java.util.NoSuchElementException;
import java.util.logging.Level;
import java.util.concurrent.TimeUnit;
public class Main {
private static final Logger LOGGER = Logger.getLogger(Main.class.getName()); // Creates a logger instance
public static void main(String[] args) {
Mqtt5BlockingClient client1 = Mqtt5Client.builder()
.identifier(UUID.randomUUID().toString()) // the unique identifier of the MQTT client. The ID is randomly generated between
.serverHost("localhost") // the host name or IP address of the MQTT server. Kept it 0.0.0.0 for testing. localhost is default if not specified.
.serverPort(1883) // specifies the port of the server
.buildBlocking(); // creates the client builder
client1.connect(); // connects the client
System.out.println("Client1 Connected");
Mqtt5BlockingClient client2 = Mqtt5Client.builder()
.identifier(UUID.randomUUID().toString()) // the unique identifier of the MQTT client. The ID is randomly generated between
.serverHost("localhost") // the host name or IP address of the MQTT server. Kept it 0.0.0.0 for testing. localhost is default if not specified.
.serverPort(1883) // specifies the port of the server
.buildBlocking(); // creates the client builder
client2.connect(); // connects the client
System.out.println("Client2 Connected");
String testmessage = "How is it going!";
byte[] messagebytesend = testmessage.getBytes(); // stores a message as a byte array to be used in the payload
try {
Mqtt5Publishes publishes = client1.publishes(MqttGlobalPublishFilter.ALL); // creates a "publishes" instance thats used to queue incoming messages
// .ALL - filters all incoming Publish messages
client1.subscribeWith() // creates a subscription
.topicFilter("test/something1/topic") // filters to receive messages only on this topic (# = Multilevel wild card, + = single level wild card)
.qos(MqttQos.AT_LEAST_ONCE) // Sets the QoS to 2 (At least once)
.send();
System.out.println("The client1 has subscribed");
client1.publishWith() // publishes the message to the subscribed topic
.topic("test/something1/topic") // publishes to the specified topic
.qos(MqttQos.AT_LEAST_ONCE)
.payload(messagebytesend) // the contents of the message
.send();
System.out.println("The client1 has published");
Mqtt5Publish receivedMessage = publishes.receive(5,TimeUnit.SECONDS).get(); // receives the message using the "publishes" instance waiting up to 5 seconds // .get() returns the object if available or throws a NoSuchElementException
byte[] tempdata = receivedMessage.getPayloadAsBytes(); // converts the "Optional" type message to a byte array
System.out.println();
String getdata = new String(tempdata); // converts the byte array to a String
System.out.println(getdata);
client2.subscribeWith() // creates a subscription
.topicFilter("test/something2/topic") // filters to receive messages only on this topic (# = Multilevel wild card, + = single level wild card)
.qos(MqttQos.AT_LEAST_ONCE) // Sets the QoS to 2 (At least once)
.send();
System.out.println("The client2 has subscribed");
client2.publishWith() // publishes the message to the subscribed topic
.topic("test/something2/topic") // publishes to the specified topic
.qos(MqttQos.AT_LEAST_ONCE)
.payload("The second message :P".getBytes()) // the contents of the message
.send();
System.out.println("The client2 has published");
System.out.println();
// VV Why isn't the publish instance below receiving the second message? Do i need another try catch? VV
receivedMessage = publishes.receive(5,TimeUnit.SECONDS).get(); // receives the message using the "publishes" instance waiting up to 5 seconds // .get() returns the object if available or throws a NoSuchElementException
byte[] tempdata2 = receivedMessage.getPayloadAsBytes(); // converts the "Optional" type message to a byte array
System.out.println();
getdata = new String(tempdata2); // converts the byte array to a String
System.out.println(getdata);
}
catch (InterruptedException e) { // Catches interruptions in the thread
LOGGER.log(Level.SEVERE, "The thread was interrupted while waiting for a message to be received", e);
}
catch (NoSuchElementException e){
System.out.println("There are no received messages"); // Handles when a publish instance has no messages
}
client1.disconnect();
System.out.println("Client1 Disconnected");
client2.disconnect();
System.out.println("Client2 Disconnected");
}
}
我得到的输出:
客户端 1 已连接
客户端 2 已连接
客户1已订阅
client1 已发布
最近怎么样!
client2已订阅
client2 已发布
没有收到消息
客户端 1 已断开连接
客户端 2 已断开连接
我想要的输出:
客户端 1 已连接
客户端 2 已连接
客户1已订阅
client1 已发布
最近怎么样!
client2已订阅
client2 已发布
第二条留言:P
客户端 1 已断开连接
客户端 2 已断开连接
我 运行 你的代码并找到这个 WARN 日志:
2019-06-11 20:32:22,774 WARN - No publish flow registered for MqttStatefulPublish{stateless=MqttPublish{topic=test/something2/topic, payload=21byte, qos=AT_LEAST_ONCE, retain=false}, packetIdentifier=51, dup=false, topicAlias=0, subscriptionIdentifiers=[1]}.
您似乎忘记为您的第二个客户端设置发布过滤器。实际上,在您等待第二条消息(对于 client2)的代码中,您检查了 client1 的消息流。
所以你只需要为 client2 添加一个发布过滤器:
Mqtt5Publishes publishesClient2 = client2.publishes(MqttGlobalPublishFilter.ALL);
然后等待客户端2的消息:
// VV Why isn't the publish instance below receiving the second message? Do i need another try catch? VV
receivedMessage = publishesClient2.receive(5,TimeUnit.SECONDS).get();
结果:
之前:
Client1 Connected
Client2 Connected
The client1 has subscribed
The client1 has published
How is it going!
The client2 has subscribed
The client2 has published
2019-06-11 20:46:36,537 WARN - No publish flow registered for MqttStatefulPublish{stateless=MqttPublish{topic=test/something2/topic, payload=21byte, qos=AT_LEAST_ONCE, retain=false}, packetIdentifier=51, dup=false, topicAlias=0, subscriptionIdentifiers=[1]}.
There are no received messages
Client1 Disconnected
Client2 Disconnected
之后:
Client1 Connected
Client2 Connected
The client1 has subscribed
The client1 has published
How is it going!
The client2 has subscribed
The client2 has published
The second message :P
Client1 Disconnected
Client2 Disconnected
编辑:我希望这是您正在寻找的解决方案,因为所需的输出与我通过修复获得的输出不同。因为 NoSuchElementException 不再是 thrown/catched。因此"There are no received messages"后第二条消息丢失。
根据评论进行编辑:使用异步风格为 client2 收集发布消息的代码段(只需将 try 块中的代码替换为以下代码):
// The list where we put our received publish messages
final List<Mqtt5Publish> incomingMessagesClient2 = new LinkedList<>();
// With the async flavour we can add a consumer for the incoming publish messages
client2.toAsync().publishes(MqttGlobalPublishFilter.ALL, mqtt5Publish ->
incomingMessagesClient2.add(mqtt5Publish));
client1.publishes(MqttGlobalPublishFilter.ALL); // creates a "publishes" instance thats used to queue incoming messages
client2.subscribeWith() // creates a subscription
.topicFilter("test/something1/topic") // filters to receive messages only on this topic (# = Multilevel wild card, + = single level wild card)
.qos(MqttQos.AT_LEAST_ONCE) // Sets the QoS to 2 (At least once)
.send();
System.out.println("The client2 has subscribed");
client1.publishWith() // publishes the message to the subscribed topic
.topic("test/something1/topic") // publishes to the specified topic
.qos(MqttQos.AT_LEAST_ONCE)
.payload(messagebytesend) // the contents of the message
.send();
System.out.println("The client1 has published");
client1.publishWith() // publishes the message to the subscribed topic
.topic("test/something1/topic") // publishes to the specified topic
.qos(MqttQos.AT_LEAST_ONCE)
.payload("The second message :P".getBytes()) // the contents of the message
.send();
System.out.println("The client1 has published");
System.out.println();
TimeUnit.SECONDS.sleep(5);
incomingMessagesClient2.forEach(mqtt5Publish -> System.out.println(new String(mqtt5Publish.getPayloadAsBytes())));
此致,
来自 HiveMQ 团队的 Michael
我正在尝试弄清楚如何使用相同的 try catch 块在 HiveMQ 客户端中接收多条消息,即使使用不同的客户端也是如此。我按照这个例子:
上面的示例适用于一个客户端和一个发布和订阅,但如果可能的话,我想在 try catch 的同一块中执行多个这些操作。
package com.main;
import java.util.UUID;
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient.Mqtt5Publishes;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import java.util.logging.Logger;
import java.util.NoSuchElementException;
import java.util.logging.Level;
import java.util.concurrent.TimeUnit;
public class Main {
private static final Logger LOGGER = Logger.getLogger(Main.class.getName()); // Creates a logger instance
public static void main(String[] args) {
Mqtt5BlockingClient client1 = Mqtt5Client.builder()
.identifier(UUID.randomUUID().toString()) // the unique identifier of the MQTT client. The ID is randomly generated between
.serverHost("localhost") // the host name or IP address of the MQTT server. Kept it 0.0.0.0 for testing. localhost is default if not specified.
.serverPort(1883) // specifies the port of the server
.buildBlocking(); // creates the client builder
client1.connect(); // connects the client
System.out.println("Client1 Connected");
Mqtt5BlockingClient client2 = Mqtt5Client.builder()
.identifier(UUID.randomUUID().toString()) // the unique identifier of the MQTT client. The ID is randomly generated between
.serverHost("localhost") // the host name or IP address of the MQTT server. Kept it 0.0.0.0 for testing. localhost is default if not specified.
.serverPort(1883) // specifies the port of the server
.buildBlocking(); // creates the client builder
client2.connect(); // connects the client
System.out.println("Client2 Connected");
String testmessage = "How is it going!";
byte[] messagebytesend = testmessage.getBytes(); // stores a message as a byte array to be used in the payload
try {
Mqtt5Publishes publishes = client1.publishes(MqttGlobalPublishFilter.ALL); // creates a "publishes" instance thats used to queue incoming messages
// .ALL - filters all incoming Publish messages
client1.subscribeWith() // creates a subscription
.topicFilter("test/something1/topic") // filters to receive messages only on this topic (# = Multilevel wild card, + = single level wild card)
.qos(MqttQos.AT_LEAST_ONCE) // Sets the QoS to 2 (At least once)
.send();
System.out.println("The client1 has subscribed");
client1.publishWith() // publishes the message to the subscribed topic
.topic("test/something1/topic") // publishes to the specified topic
.qos(MqttQos.AT_LEAST_ONCE)
.payload(messagebytesend) // the contents of the message
.send();
System.out.println("The client1 has published");
Mqtt5Publish receivedMessage = publishes.receive(5,TimeUnit.SECONDS).get(); // receives the message using the "publishes" instance waiting up to 5 seconds // .get() returns the object if available or throws a NoSuchElementException
byte[] tempdata = receivedMessage.getPayloadAsBytes(); // converts the "Optional" type message to a byte array
System.out.println();
String getdata = new String(tempdata); // converts the byte array to a String
System.out.println(getdata);
client2.subscribeWith() // creates a subscription
.topicFilter("test/something2/topic") // filters to receive messages only on this topic (# = Multilevel wild card, + = single level wild card)
.qos(MqttQos.AT_LEAST_ONCE) // Sets the QoS to 2 (At least once)
.send();
System.out.println("The client2 has subscribed");
client2.publishWith() // publishes the message to the subscribed topic
.topic("test/something2/topic") // publishes to the specified topic
.qos(MqttQos.AT_LEAST_ONCE)
.payload("The second message :P".getBytes()) // the contents of the message
.send();
System.out.println("The client2 has published");
System.out.println();
// VV Why isn't the publish instance below receiving the second message? Do i need another try catch? VV
receivedMessage = publishes.receive(5,TimeUnit.SECONDS).get(); // receives the message using the "publishes" instance waiting up to 5 seconds // .get() returns the object if available or throws a NoSuchElementException
byte[] tempdata2 = receivedMessage.getPayloadAsBytes(); // converts the "Optional" type message to a byte array
System.out.println();
getdata = new String(tempdata2); // converts the byte array to a String
System.out.println(getdata);
}
catch (InterruptedException e) { // Catches interruptions in the thread
LOGGER.log(Level.SEVERE, "The thread was interrupted while waiting for a message to be received", e);
}
catch (NoSuchElementException e){
System.out.println("There are no received messages"); // Handles when a publish instance has no messages
}
client1.disconnect();
System.out.println("Client1 Disconnected");
client2.disconnect();
System.out.println("Client2 Disconnected");
}
}
我得到的输出:
客户端 1 已连接
客户端 2 已连接
客户1已订阅
client1 已发布
最近怎么样!
client2已订阅
client2 已发布
没有收到消息
客户端 1 已断开连接
客户端 2 已断开连接
我想要的输出:
客户端 1 已连接
客户端 2 已连接
客户1已订阅
client1 已发布
最近怎么样!
client2已订阅
client2 已发布
第二条留言:P
客户端 1 已断开连接
客户端 2 已断开连接
我 运行 你的代码并找到这个 WARN 日志:
2019-06-11 20:32:22,774 WARN - No publish flow registered for MqttStatefulPublish{stateless=MqttPublish{topic=test/something2/topic, payload=21byte, qos=AT_LEAST_ONCE, retain=false}, packetIdentifier=51, dup=false, topicAlias=0, subscriptionIdentifiers=[1]}.
您似乎忘记为您的第二个客户端设置发布过滤器。实际上,在您等待第二条消息(对于 client2)的代码中,您检查了 client1 的消息流。 所以你只需要为 client2 添加一个发布过滤器:
Mqtt5Publishes publishesClient2 = client2.publishes(MqttGlobalPublishFilter.ALL);
然后等待客户端2的消息:
// VV Why isn't the publish instance below receiving the second message? Do i need another try catch? VV
receivedMessage = publishesClient2.receive(5,TimeUnit.SECONDS).get();
结果:
之前:
Client1 Connected
Client2 Connected
The client1 has subscribed
The client1 has published
How is it going!
The client2 has subscribed
The client2 has published
2019-06-11 20:46:36,537 WARN - No publish flow registered for MqttStatefulPublish{stateless=MqttPublish{topic=test/something2/topic, payload=21byte, qos=AT_LEAST_ONCE, retain=false}, packetIdentifier=51, dup=false, topicAlias=0, subscriptionIdentifiers=[1]}.
There are no received messages
Client1 Disconnected
Client2 Disconnected
之后:
Client1 Connected
Client2 Connected
The client1 has subscribed
The client1 has published
How is it going!
The client2 has subscribed
The client2 has published
The second message :P
Client1 Disconnected
Client2 Disconnected
编辑:我希望这是您正在寻找的解决方案,因为所需的输出与我通过修复获得的输出不同。因为 NoSuchElementException 不再是 thrown/catched。因此"There are no received messages"后第二条消息丢失。
根据评论进行编辑:使用异步风格为 client2 收集发布消息的代码段(只需将 try 块中的代码替换为以下代码):
// The list where we put our received publish messages
final List<Mqtt5Publish> incomingMessagesClient2 = new LinkedList<>();
// With the async flavour we can add a consumer for the incoming publish messages
client2.toAsync().publishes(MqttGlobalPublishFilter.ALL, mqtt5Publish ->
incomingMessagesClient2.add(mqtt5Publish));
client1.publishes(MqttGlobalPublishFilter.ALL); // creates a "publishes" instance thats used to queue incoming messages
client2.subscribeWith() // creates a subscription
.topicFilter("test/something1/topic") // filters to receive messages only on this topic (# = Multilevel wild card, + = single level wild card)
.qos(MqttQos.AT_LEAST_ONCE) // Sets the QoS to 2 (At least once)
.send();
System.out.println("The client2 has subscribed");
client1.publishWith() // publishes the message to the subscribed topic
.topic("test/something1/topic") // publishes to the specified topic
.qos(MqttQos.AT_LEAST_ONCE)
.payload(messagebytesend) // the contents of the message
.send();
System.out.println("The client1 has published");
client1.publishWith() // publishes the message to the subscribed topic
.topic("test/something1/topic") // publishes to the specified topic
.qos(MqttQos.AT_LEAST_ONCE)
.payload("The second message :P".getBytes()) // the contents of the message
.send();
System.out.println("The client1 has published");
System.out.println();
TimeUnit.SECONDS.sleep(5);
incomingMessagesClient2.forEach(mqtt5Publish -> System.out.println(new String(mqtt5Publish.getPayloadAsBytes())));
此致,
来自 HiveMQ 团队的 Michael