MQTT 和 SpringBoot 集成连接丢失
MQTT and SpringBoot Integration Connection Lost
我目前在 SpringBoot 中有一个 API,我想添加一个 MQTT 客户端来订阅一个或多个主题。
我尝试了几个 Paho、Hive 客户端,但没有成功,我目前使用的是使用 Paho 的 SpringBoot 的默认 MQTT,但即使使用基本配置我也无法让它工作。
我一启动应用程序就收到“连接丢失”错误...
你能告诉我一个修复或其他可行的方法吗?谢谢!
行家:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
....
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import lombok.extern.slf4j.Slf4j;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
@Slf4j
@SpringBootApplication
@EnableSwagger2
public class MainApiSpring {
public static void main(String[] args) {
SpringApplication.run(MainApiSpring.class, args);
log.trace("L'application a correctement été démarrée.");
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883",
"test/topic");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
运行 上的错误:
2020-09-04 10:31:39.099 ERROR 4244 --- [ main] .m.i.MqttPahoMessageDrivenChannelAdapter : Exception while connecting and subscribing, retrying
org.eclipse.paho.client.mqttv3.MqttException: Connexion perdue
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:197) ~[org.eclipse.paho.client.mqttv3-1.2.4.jar:na]
at java.base/java.lang.Thread.run(Thread.java:830) ~[na:na]
Caused by: java.io.EOFException: null
at java.base/java.io.DataInputStream.readByte(DataInputStream.java:272) ~[na:na]
at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:92) ~[org.eclipse.paho.client.mqttv3-1.2.4.jar:na]
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:137) ~[org.eclipse.paho.client.mqttv3-1.2.4.jar:na]
... 1 common frames omitted
答案:这适用于定义的 MqttOptions!
@Bean
public MqttConnectOptions getReceiverMqttConnectOptions() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setConnectionTimeout(30);
mqttConnectOptions.setKeepAliveInterval(60);
mqttConnectOptions.setAutomaticReconnect(true);
// mqttConnectOptions.setUserName("myemail");
String password = "mypassword!";
// String hostUrl = "tcp://maqiatto.com:1883";
String hostUrl = "tcp://localhost:1883";
// mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setServerURIs(new String[] { hostUrl });
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getReceiverMqttConnectOptions());
return factory;
}
@Bean
public MessageProducer inbound() {
String clientId2 = "uuid-" + UUID.randomUUID().toString();
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId2,
// mqttClientFactory(), "myemail/test");
mqttClientFactory(), "test", "test/paho");
adapter.setCompletionTimeout(20000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(2);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
我目前在 SpringBoot 中有一个 API,我想添加一个 MQTT 客户端来订阅一个或多个主题。
我尝试了几个 Paho、Hive 客户端,但没有成功,我目前使用的是使用 Paho 的 SpringBoot 的默认 MQTT,但即使使用基本配置我也无法让它工作。
我一启动应用程序就收到“连接丢失”错误...
你能告诉我一个修复或其他可行的方法吗?谢谢!
行家:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
....
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import lombok.extern.slf4j.Slf4j;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
@Slf4j
@SpringBootApplication
@EnableSwagger2
public class MainApiSpring {
public static void main(String[] args) {
SpringApplication.run(MainApiSpring.class, args);
log.trace("L'application a correctement été démarrée.");
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883",
"test/topic");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
运行 上的错误:
2020-09-04 10:31:39.099 ERROR 4244 --- [ main] .m.i.MqttPahoMessageDrivenChannelAdapter : Exception while connecting and subscribing, retrying
org.eclipse.paho.client.mqttv3.MqttException: Connexion perdue
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:197) ~[org.eclipse.paho.client.mqttv3-1.2.4.jar:na]
at java.base/java.lang.Thread.run(Thread.java:830) ~[na:na]
Caused by: java.io.EOFException: null
at java.base/java.io.DataInputStream.readByte(DataInputStream.java:272) ~[na:na]
at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:92) ~[org.eclipse.paho.client.mqttv3-1.2.4.jar:na]
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:137) ~[org.eclipse.paho.client.mqttv3-1.2.4.jar:na]
... 1 common frames omitted
答案:这适用于定义的 MqttOptions!
@Bean
public MqttConnectOptions getReceiverMqttConnectOptions() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setConnectionTimeout(30);
mqttConnectOptions.setKeepAliveInterval(60);
mqttConnectOptions.setAutomaticReconnect(true);
// mqttConnectOptions.setUserName("myemail");
String password = "mypassword!";
// String hostUrl = "tcp://maqiatto.com:1883";
String hostUrl = "tcp://localhost:1883";
// mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setServerURIs(new String[] { hostUrl });
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getReceiverMqttConnectOptions());
return factory;
}
@Bean
public MessageProducer inbound() {
String clientId2 = "uuid-" + UUID.randomUUID().toString();
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId2,
// mqttClientFactory(), "myemail/test");
mqttClientFactory(), "test", "test/paho");
adapter.setCompletionTimeout(20000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(2);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}