MQTT Java 将所有订阅消息放在同一个队列中
MQTT Java puts all subscription messages in the same queue
我尝试了下面的应用程序,其中两个客户端不断发送一条消息,另一个客户端接收它们。但是在这里,似乎是在单线程中调用了 messageArrived 方法。因为一旦来自一个客户端的消息被卡住,其他客户端的消息也不会收到。这是我的实施问题还是这种行为?请指教。下面显示的是我的代码。
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class TestMain implements MqttCallback {
static TestMain t = new TestMain();
static MqttClient client;
static Timer timer = new Timer();
static Calendar cal = Calendar.getInstance();
static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
static int count = 0;
public static void main(String[] args) {
int qos = 2;
MemoryPersistence persistence = new MemoryPersistence();
try {
client = new MqttClient("tcp://localhost:1883", "Receiving");
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
client.connect();
client.setCallback(t);
client.subscribe(new String[] { "foo1", "foo2" });
ClientPub2 c = new ClientPub2();
c.sendMessage();
ClientPub1 c1 = new ClientPub1();
c1.sendMessage();
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void connectionLost(Throwable cause) {
// TODO Auto-generated method stub
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
if (topic.equalsIgnoreCase("foo1")) {
while (true) {
int i = 0;
}
// System.out.println("Received Message: foo1 :" + message);
} else if (topic.equalsIgnoreCase("foo2")) {
System.out.println("Received message: foo2 : " + message);
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// TODO Auto-generated method stub
}
}
另一个客户端发送消息:
import java.util.Timer;
import java.util.TimerTask;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
public class ClientPub2 implements MqttCallback {
MqttClient client;
int qos = 2;
Timer timer = new Timer();
public static int sendCount = 0;
public ClientPub2() {
try {
client = new MqttClient("tcp://localhost:1883", "Sending");
} catch (MqttException e1) {
e1.printStackTrace();
}
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
try {
client.connect();
client.setCallback(this);
client.subscribe("foo2");
} catch (MqttSecurityException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
public void sendMessage() {
MqttMessage message = new MqttMessage();
message.setPayload("A single message from my computer fff".getBytes());
message.setQos(qos);
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
sendCount++;
System.out.println("foo2 : Sending message A single message from my computer fff " + sendCount);
message.setPayload(new String("A single message from my computer fff " + sendCount).getBytes());
try {
client.publish("foo2", message);
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
}, 0, 1000);
}
@Override
public void connectionLost(Throwable arg0) {
// TODO Auto-generated method stub
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
// TODO Auto-generated method stub
}
@Override
public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
// TODO Auto-generated method stub
}
}
另一个客户端发送消息:
import java.util.Timer;
import java.util.TimerTask;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
public class ClientPub1 implements MqttCallback {
MqttClient client;
int qos = 2;
Timer timer = new Timer();
public static int sendCount = 0;
public ClientPub1() {
try {
client = new MqttClient("tcp://localhost:1883", "Sending2");
} catch (MqttException e1) {
e1.printStackTrace();
}
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
try {
client.connect();
client.setCallback(this);
client.subscribe("foo1");
} catch (MqttSecurityException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void connectionLost(Throwable arg0) {
}
public void sendMessage() {
MqttMessage message = new MqttMessage();
message.setPayload("A single message from my computer fff".getBytes());
message.setQos(qos);
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
sendCount++;
System.out.println("foo1 : Sending message A single message from my computer fff " + sendCount);
message.setPayload(new String("A single message from my computer fff " + sendCount).getBytes());
try {
client.publish("foo1", message);
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
}, 0, 1000);
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
// TODO Auto-generated method stub
}
@Override
public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
// TODO Auto-generated method stub
}
}
你说得对,MQTT 库运行单线程来传送传入消息。
如果您想并行处理传入消息,则需要实现自己的本地队列和线程池。 java.util 包中有构建这样东西的工具。看看 java.util.concurent.ThreadPoolExecutor class 作为一个很好的起点
我尝试了下面的应用程序,其中两个客户端不断发送一条消息,另一个客户端接收它们。但是在这里,似乎是在单线程中调用了 messageArrived 方法。因为一旦来自一个客户端的消息被卡住,其他客户端的消息也不会收到。这是我的实施问题还是这种行为?请指教。下面显示的是我的代码。
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class TestMain implements MqttCallback {
static TestMain t = new TestMain();
static MqttClient client;
static Timer timer = new Timer();
static Calendar cal = Calendar.getInstance();
static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
static int count = 0;
public static void main(String[] args) {
int qos = 2;
MemoryPersistence persistence = new MemoryPersistence();
try {
client = new MqttClient("tcp://localhost:1883", "Receiving");
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
client.connect();
client.setCallback(t);
client.subscribe(new String[] { "foo1", "foo2" });
ClientPub2 c = new ClientPub2();
c.sendMessage();
ClientPub1 c1 = new ClientPub1();
c1.sendMessage();
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void connectionLost(Throwable cause) {
// TODO Auto-generated method stub
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
if (topic.equalsIgnoreCase("foo1")) {
while (true) {
int i = 0;
}
// System.out.println("Received Message: foo1 :" + message);
} else if (topic.equalsIgnoreCase("foo2")) {
System.out.println("Received message: foo2 : " + message);
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// TODO Auto-generated method stub
}
}
另一个客户端发送消息:
import java.util.Timer;
import java.util.TimerTask;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
public class ClientPub2 implements MqttCallback {
MqttClient client;
int qos = 2;
Timer timer = new Timer();
public static int sendCount = 0;
public ClientPub2() {
try {
client = new MqttClient("tcp://localhost:1883", "Sending");
} catch (MqttException e1) {
e1.printStackTrace();
}
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
try {
client.connect();
client.setCallback(this);
client.subscribe("foo2");
} catch (MqttSecurityException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
public void sendMessage() {
MqttMessage message = new MqttMessage();
message.setPayload("A single message from my computer fff".getBytes());
message.setQos(qos);
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
sendCount++;
System.out.println("foo2 : Sending message A single message from my computer fff " + sendCount);
message.setPayload(new String("A single message from my computer fff " + sendCount).getBytes());
try {
client.publish("foo2", message);
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
}, 0, 1000);
}
@Override
public void connectionLost(Throwable arg0) {
// TODO Auto-generated method stub
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
// TODO Auto-generated method stub
}
@Override
public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
// TODO Auto-generated method stub
}
}
另一个客户端发送消息:
import java.util.Timer;
import java.util.TimerTask;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
public class ClientPub1 implements MqttCallback {
MqttClient client;
int qos = 2;
Timer timer = new Timer();
public static int sendCount = 0;
public ClientPub1() {
try {
client = new MqttClient("tcp://localhost:1883", "Sending2");
} catch (MqttException e1) {
e1.printStackTrace();
}
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
try {
client.connect();
client.setCallback(this);
client.subscribe("foo1");
} catch (MqttSecurityException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void connectionLost(Throwable arg0) {
}
public void sendMessage() {
MqttMessage message = new MqttMessage();
message.setPayload("A single message from my computer fff".getBytes());
message.setQos(qos);
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
sendCount++;
System.out.println("foo1 : Sending message A single message from my computer fff " + sendCount);
message.setPayload(new String("A single message from my computer fff " + sendCount).getBytes());
try {
client.publish("foo1", message);
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
}, 0, 1000);
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
// TODO Auto-generated method stub
}
@Override
public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
// TODO Auto-generated method stub
}
}
你说得对,MQTT 库运行单线程来传送传入消息。
如果您想并行处理传入消息,则需要实现自己的本地队列和线程池。 java.util 包中有构建这样东西的工具。看看 java.util.concurent.ThreadPoolExecutor class 作为一个很好的起点