使用 Mqttclient 异步并发处理多个消息
Process multiple messaging with Mqttclient asynchronously and concurrently
我正在开发一个使用来自 MQTT 主题的消息的程序,我的目标是我可以异步使用和处理多条消息。
我正在使用 eclipse 客户端:
https://www.eclipse.org/paho/files/javadoc/org/eclipse/paho/client/mqttv3/MqttClient.html
https://www.eclipse.org/paho/files/javadoc/org/eclipse/paho/client/mqttv3/MqttAsyncClient.html
问题是几条消息不是同时处理的,都是在同一个线程中执行的。我不太了解使用 MqttClient 和 MqttAsyncClient 之间的区别。 javadoc 说:
MqttClient
Lightweight client for talking to an MQTT server using methods that
block until an operation completes.
MqttAsyncClient
Lightweight client for talking to an MQTT server using non-blocking
methods that allow an operation to run in the background.
我也不太清楚使用方法"subscribe"或"setCallback"之间的区别。只有 "subscribe" 你可以声明多个监听器:
设置回调
Sets a callback listener to use for events that happen asynchronously.
subscribe
Subscribe to a topic...
它试图同时发送十条消息。我的测试如下:
public class FooListener implements IMqttMessageListener {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("Thread [ " + Thread.currentThread().getName() +
"], Topic[ "+ topic + "], Message [" + message +"] ");
}
}
public class FooCallbackListener implements MqttCallback {
@Override
public void connectionLost(Throwable e) {
e.printStackTrace();
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
//TODO:emtpy
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("Thread [ " + Thread.currentThread().getName() +
"], Topic[ "+ topic + "], Message [" + message +"] ");
}
}
MqttClient 和订阅:
public class FooMqttClient {
public static void main(String[] args) {
MqttConnectOptions connOpt = new MqttConnectOptions();
connOpt.setCleanSession(true);
connOpt.setKeepAliveInterval(30);
String serverUri = "tcp://iot.eclipse.org:1883";
String clientId = UUID.randomUUID().toString();
try {
MqttClient myClient = new MqttClient(serverUri, clientId);
myClient.connect(connOpt);
myClient.subscribe("topic/foo", new FooListener());
} catch (MqttException e) {
e.printStackTrace();
}
}
}
结果:
Thread [ MQTT Call: 7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[topic/foo], Message [Foo 0]
Thread [ MQTT Call:7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 1]
Thread [ MQTT Call: 7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 2]
Thread [ MQTT Call:7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 3]
Thread [ MQTT Call: 7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 4]
Thread [ MQTT Call:7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 5]
Thread [ MQTT Call: 7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 6]
Thread [ MQTT Call:7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 7]
Thread [ MQTT Call: 7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 8]
Thread [ MQTT Call:7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 9]
MqttClient 和 setCallback:
public class FooMqttCallbackClient {
public static void main(String[] args) {
MqttConnectOptions connOpt = new MqttConnectOptions();
connOpt.setCleanSession(true);
connOpt.setKeepAliveInterval(30);
String serverUri = "tcp://iot.eclipse.org:1883";
String clientId = UUID.randomUUID().toString();
try {
MqttClient myClient = new MqttClient(serverUri, clientId);
myClient.connect(connOpt);
myClient.subscribe("topic/foo");
myClient.setCallback(new FooCallbackListener());
} catch (MqttException e) {
e.printStackTrace();
}
}
}
结果:
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 0]
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 1]
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 2]
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 3]
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 4]
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 5]
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 6]
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 7]
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 8]
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 9]
MqttAsyncClient 和订阅:
public class FooAsyncMqttClient {
public static void main(String[] args) {
MqttConnectOptions connOpt = new MqttConnectOptions();
connOpt.setCleanSession(true);
connOpt.setKeepAliveInterval(30);
String serverUri = "tcp://iot.eclipse.org:1883";
String clientId = UUID.randomUUID().toString();
try {
MqttAsyncClient myClient = new MqttAsyncClient(serverUri, clientId);
myClient.connect(connOpt);
Thread.sleep(1000);
myClient.subscribe("topic/foo", 1, new FooListener());
} catch (MqttException e) {
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
结果:
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo], Message [Foo 0]
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo], Message [Foo 1]
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo], Message [Foo 2]
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo], Message [Foo 3]
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo], Message [Foo 4]
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo], Message [Foo 5]
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo], Message [Foo 6]
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo], Message [Foo 7]
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo], Message [Foo 8]
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo], Message [Foo 9]
MqttAsyncClient 和 setCallback
public class FooAsyncMqttCallbackClient {
public static void main(String[] args) {
MqttConnectOptions connOpt = new MqttConnectOptions();
connOpt.setCleanSession(true);
connOpt.setKeepAliveInterval(30);
String serverUri = "tcp://iot.eclipse.org:1883";
String clientId = UUID.randomUUID().toString();
try {
MqttAsyncClient myClient = new MqttAsyncClient(serverUri, clientId);
myClient.connect(connOpt);
Thread.sleep(1000);
myClient.subscribe("topic/foo", 1);
myClient.setCallback(new FooCallbackListener());
} catch (MqttException e) {
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
结果:
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 0]
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 1]
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 2]
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 3]
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 4]
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 5]
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 6]
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 7]
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 8]
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 9]
在我所有的测试中,侦听器都在同一个线程中执行,而不是同时执行。我该怎么做才能同时并发地处理消息? MqttClient 和 MqttAsyncClient 有什么区别?
解法:
public class FooExecutorListener implements IMqttMessageListener {
private ExecutorService pool = Executors.newFixedThreadPool(10);
class MessageHandler implements Runnable {
MqttMessage message;
String topic;
public MessageHandler(String topic, MqttMessage message) {
this.message = message;
this.topic = topic;
}
public void run() {
System.out.println("Thread [ " + Thread.currentThread().getName() +
"], Topic[ "+ topic + "], Message [" + message +"] ");
}
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
pool.execute(new MessageHandler(topic, message));
}
}
结果:
Thread [ pool-2-thread-1], Topic[ topic/foo], Message [Foo 0]
Thread [ pool-2-thread-2], Topic[ topic/foo], Message [Foo 1]
Thread [ pool-2-thread-5], Topic[ topic/foo], Message [Foo 4]
Thread [ pool-2-thread-4], Topic[ topic/foo], Message [Foo 3]
Thread [ pool-2-thread-7], Topic[ topic/foo], Message [Foo 6]
Thread [ pool-2-thread-6], Topic[ topic/foo], Message [Foo 5]
Thread [ pool-2-thread-8], Topic[ topic/foo], Message [Foo 7]
Thread [ pool-2-thread-3], Topic[ topic/foo], Message [Foo 2]
Thread [ pool-2-thread-1], Topic[ topic/foo], Message [Foo 10]
Thread [ pool-2-thread-2], Topic[ topic/foo], Message [Foo 11]
Thread [ pool-2-thread-5], Topic[ topic/foo], Message [Foo 12]
Thread [ pool-2-thread-5], Topic[ topic/foo], Message [Foo 13]
Thread [ pool-2-thread-5], Topic[ topic/foo], Message [Foo 14]
Thread [ pool-2-thread-9], Topic[ topic/foo], Message [Foo 8]
Thread [ pool-2-thread-10], Topic[ topic/foo], Message [Foo 9]
2 版客户端的区别在于 connecting/publishing 不订阅。异步版本将无阻塞地连接和发布。
两种情况下的订阅处理都在后台网络处理。
如果您想并行处理传入消息,则需要实现自己的线程池并将传入消息分发到池中。
最简单的方法是使用 Java 的 ExecutorService class。例如
public class FooListener implements IMqttMessageListener {
ExecutorService pool = new ExecutorService(10);
class MessageHandler implements Runnable {
MqttMessage message;
String topic;
public MessageHandler(String topic; MqttMessage message) {
this.message = message;
}
public void run() {
//process message
}
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
pool.execute(new MessageHandler(topic,message));
}
}
我正在开发一个使用来自 MQTT 主题的消息的程序,我的目标是我可以异步使用和处理多条消息。
我正在使用 eclipse 客户端: https://www.eclipse.org/paho/files/javadoc/org/eclipse/paho/client/mqttv3/MqttClient.html https://www.eclipse.org/paho/files/javadoc/org/eclipse/paho/client/mqttv3/MqttAsyncClient.html
问题是几条消息不是同时处理的,都是在同一个线程中执行的。我不太了解使用 MqttClient 和 MqttAsyncClient 之间的区别。 javadoc 说:
MqttClient
Lightweight client for talking to an MQTT server using methods that block until an operation completes.
MqttAsyncClient
Lightweight client for talking to an MQTT server using non-blocking methods that allow an operation to run in the background.
我也不太清楚使用方法"subscribe"或"setCallback"之间的区别。只有 "subscribe" 你可以声明多个监听器: 设置回调
Sets a callback listener to use for events that happen asynchronously. subscribe Subscribe to a topic...
它试图同时发送十条消息。我的测试如下:
public class FooListener implements IMqttMessageListener {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("Thread [ " + Thread.currentThread().getName() +
"], Topic[ "+ topic + "], Message [" + message +"] ");
}
}
public class FooCallbackListener implements MqttCallback {
@Override
public void connectionLost(Throwable e) {
e.printStackTrace();
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
//TODO:emtpy
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("Thread [ " + Thread.currentThread().getName() +
"], Topic[ "+ topic + "], Message [" + message +"] ");
}
}
MqttClient 和订阅:
public class FooMqttClient {
public static void main(String[] args) {
MqttConnectOptions connOpt = new MqttConnectOptions();
connOpt.setCleanSession(true);
connOpt.setKeepAliveInterval(30);
String serverUri = "tcp://iot.eclipse.org:1883";
String clientId = UUID.randomUUID().toString();
try {
MqttClient myClient = new MqttClient(serverUri, clientId);
myClient.connect(connOpt);
myClient.subscribe("topic/foo", new FooListener());
} catch (MqttException e) {
e.printStackTrace();
}
}
}
结果:
Thread [ MQTT Call: 7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[topic/foo], Message [Foo 0]
Thread [ MQTT Call:7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 1]
Thread [ MQTT Call: 7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 2]
Thread [ MQTT Call:7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 3]
Thread [ MQTT Call: 7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 4]
Thread [ MQTT Call:7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 5]
Thread [ MQTT Call: 7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 6]
Thread [ MQTT Call:7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 7]
Thread [ MQTT Call: 7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 8]
Thread [ MQTT Call:7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 9]
MqttClient 和 setCallback:
public class FooMqttCallbackClient {
public static void main(String[] args) {
MqttConnectOptions connOpt = new MqttConnectOptions();
connOpt.setCleanSession(true);
connOpt.setKeepAliveInterval(30);
String serverUri = "tcp://iot.eclipse.org:1883";
String clientId = UUID.randomUUID().toString();
try {
MqttClient myClient = new MqttClient(serverUri, clientId);
myClient.connect(connOpt);
myClient.subscribe("topic/foo");
myClient.setCallback(new FooCallbackListener());
} catch (MqttException e) {
e.printStackTrace();
}
}
}
结果:
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 0]
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 1]
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 2]
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 3]
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 4]
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 5]
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 6]
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 7]
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 8]
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 9]
MqttAsyncClient 和订阅:
public class FooAsyncMqttClient {
public static void main(String[] args) {
MqttConnectOptions connOpt = new MqttConnectOptions();
connOpt.setCleanSession(true);
connOpt.setKeepAliveInterval(30);
String serverUri = "tcp://iot.eclipse.org:1883";
String clientId = UUID.randomUUID().toString();
try {
MqttAsyncClient myClient = new MqttAsyncClient(serverUri, clientId);
myClient.connect(connOpt);
Thread.sleep(1000);
myClient.subscribe("topic/foo", 1, new FooListener());
} catch (MqttException e) {
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
结果:
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo], Message [Foo 0]
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo], Message [Foo 1]
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo], Message [Foo 2]
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo], Message [Foo 3]
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo], Message [Foo 4]
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo], Message [Foo 5]
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo], Message [Foo 6]
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo], Message [Foo 7]
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo], Message [Foo 8]
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo], Message [Foo 9]
MqttAsyncClient 和 setCallback
public class FooAsyncMqttCallbackClient {
public static void main(String[] args) {
MqttConnectOptions connOpt = new MqttConnectOptions();
connOpt.setCleanSession(true);
connOpt.setKeepAliveInterval(30);
String serverUri = "tcp://iot.eclipse.org:1883";
String clientId = UUID.randomUUID().toString();
try {
MqttAsyncClient myClient = new MqttAsyncClient(serverUri, clientId);
myClient.connect(connOpt);
Thread.sleep(1000);
myClient.subscribe("topic/foo", 1);
myClient.setCallback(new FooCallbackListener());
} catch (MqttException e) {
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
结果:
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 0]
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 1]
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 2]
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 3]
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 4]
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 5]
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 6]
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 7]
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 8]
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 9]
在我所有的测试中,侦听器都在同一个线程中执行,而不是同时执行。我该怎么做才能同时并发地处理消息? MqttClient 和 MqttAsyncClient 有什么区别?
解法:
public class FooExecutorListener implements IMqttMessageListener {
private ExecutorService pool = Executors.newFixedThreadPool(10);
class MessageHandler implements Runnable {
MqttMessage message;
String topic;
public MessageHandler(String topic, MqttMessage message) {
this.message = message;
this.topic = topic;
}
public void run() {
System.out.println("Thread [ " + Thread.currentThread().getName() +
"], Topic[ "+ topic + "], Message [" + message +"] ");
}
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
pool.execute(new MessageHandler(topic, message));
}
}
结果:
Thread [ pool-2-thread-1], Topic[ topic/foo], Message [Foo 0]
Thread [ pool-2-thread-2], Topic[ topic/foo], Message [Foo 1]
Thread [ pool-2-thread-5], Topic[ topic/foo], Message [Foo 4]
Thread [ pool-2-thread-4], Topic[ topic/foo], Message [Foo 3]
Thread [ pool-2-thread-7], Topic[ topic/foo], Message [Foo 6]
Thread [ pool-2-thread-6], Topic[ topic/foo], Message [Foo 5]
Thread [ pool-2-thread-8], Topic[ topic/foo], Message [Foo 7]
Thread [ pool-2-thread-3], Topic[ topic/foo], Message [Foo 2]
Thread [ pool-2-thread-1], Topic[ topic/foo], Message [Foo 10]
Thread [ pool-2-thread-2], Topic[ topic/foo], Message [Foo 11]
Thread [ pool-2-thread-5], Topic[ topic/foo], Message [Foo 12]
Thread [ pool-2-thread-5], Topic[ topic/foo], Message [Foo 13]
Thread [ pool-2-thread-5], Topic[ topic/foo], Message [Foo 14]
Thread [ pool-2-thread-9], Topic[ topic/foo], Message [Foo 8]
Thread [ pool-2-thread-10], Topic[ topic/foo], Message [Foo 9]
2 版客户端的区别在于 connecting/publishing 不订阅。异步版本将无阻塞地连接和发布。
两种情况下的订阅处理都在后台网络处理。
如果您想并行处理传入消息,则需要实现自己的线程池并将传入消息分发到池中。
最简单的方法是使用 Java 的 ExecutorService class。例如
public class FooListener implements IMqttMessageListener {
ExecutorService pool = new ExecutorService(10);
class MessageHandler implements Runnable {
MqttMessage message;
String topic;
public MessageHandler(String topic; MqttMessage message) {
this.message = message;
}
public void run() {
//process message
}
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
pool.execute(new MessageHandler(topic,message));
}
}