如何使用 Spring 集成实现 MQTT 服务器?
How to implement MQTT server using Spring Integration?
当我 运行 MQTT 的 Outbound Channel Adapter 示例时,它抛出错误:
Executing command line: /Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/bin/java -classpath /Users/afarber/src/spring-newbie/MqttOutbound/target/classes:/Users/afarber/.m2/repository/org/springframework/boot/spring-boot/1.4.1.RELEASE/spring-boot-1.4.1.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/boot/spring-boot-starter/1.4.1.RELEASE/spring-boot-starter-1.4.1.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/boot/spring-boot-autoconfigure/1.4.1.RELEASE/spring-boot-autoconfigure-1.4.1.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/boot/spring-boot-starter-logging/1.4.1.RELEASE/spring-boot-starter-logging-1.4.1.RELEASE.jar:/Users/afarber/.m2/repository/ch/qos/logback/logback-classic/1.1.7/logback-classic-1.1.7.jar:/Users/afarber/.m2/repository/ch/qos/logback/logback-core/1.1.7/logback-core-1.1.7.jar:/Users/afarber/.m2/repository/org/slf4j/jcl-over-slf4j/1.7.21/jcl-over-slf4j-1.7.21.jar:/Users/afarber/.m2/repository/org/slf4j/jul-to-slf4j/1.7.21/jul-to-slf4j-1.7.21.jar:/Users/afarber/.m2/repository/org/slf4j/log4j-over-slf4j/1.7.21/log4j-over-slf4j-1.7.21.jar:/Users/afarber/.m2/repository/org/yaml/snakeyaml/1.17/snakeyaml-1.17.jar:/Users/afarber/.m2/repository/org/slf4j/slf4j-api/1.7.16/slf4j-api-1.7.16.jar:/Users/afarber/.m2/repository/org/springframework/spring-context/4.3.2.RELEASE/spring-context-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-aop/4.3.2.RELEASE/spring-aop-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-beans/4.3.2.RELEASE/spring-beans-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-expression/4.3.2.RELEASE/spring-expression-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-core/4.3.2.RELEASE/spring-core-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/commons-logging/commons-logging/1.2/commons-logging-1.2.jar:/Users/afarber/.m2/repository/org/springframework/integration/spring-integration-core/4.3.2.RELEASE/spring-integration-core-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-messaging/4.3.3.RELEASE/spring-messaging-4.3.3.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-tx/4.3.3.RELEASE/spring-tx-4.3.3.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/retry/spring-retry/1.1.3.RELEASE/spring-retry-1.1.3.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/integration/spring-integration-mqtt/4.3.2.RELEASE/spring-integration-mqtt-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/eclipse/paho/org.eclipse.paho.client.mqttv3/1.0.2/org.eclipse.paho.client.mqttv3-1.0.2.jar de.afarber.mqttoutbound.MqttJavaApplication
. ____ _ __ _ _
/\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v1.4.1.RELEASE)
2016-10-11 21:53:36.811 INFO 2102 --- [ main] d.a.mqttoutbound.MqttJavaApplication : Starting MqttJavaApplication on mba.local with PID 2102 (/Users/afarber/src/spring-newbie/MqttOutbound/target/classes started by afarber in /Users/afarber/src/spring-newbie/MqttOutbound)
2016-10-11 21:53:36.816 INFO 2102 --- [ main] d.a.mqttoutbound.MqttJavaApplication : No active profile set, falling back to default profiles: default
2016-10-11 21:53:36.960 INFO 2102 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@35a50a4c: startup date [Tue Oct 11 21:53:36 CEST 2016]; root of context hierarchy
2016-10-11 21:53:37.724 INFO 2102 --- [ main] o.s.b.f.config.PropertiesFactoryBean : Loading properties file from URL [jar:file:/Users/afarber/.m2/repository/org/springframework/integration/spring-integration-core/4.3.2.RELEASE/spring-integration-core-4.3.2.RELEASE.jar!/META-INF/spring.integration.default.properties]
2016-10-11 21:53:37.729 INFO 2102 --- [ main] o.s.i.config.IntegrationRegistrar : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2016-10-11 21:53:37.933 INFO 2102 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2016-10-11 21:53:37.947 INFO 2102 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2016-10-11 21:53:38.143 INFO 2102 --- [ main] o.s.b.f.config.PropertiesFactoryBean : Loading properties file from URL [jar:file:/Users/afarber/.m2/repository/org/springframework/integration/spring-integration-core/4.3.2.RELEASE/spring-integration-core-4.3.2.RELEASE.jar!/META-INF/spring.integration.default.properties]
2016-10-11 21:53:38.148 INFO 2102 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationGlobalProperties' of type [class org.springframework.beans.factory.config.PropertiesFactoryBean] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2016-10-11 21:53:38.177 INFO 2102 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationGlobalProperties' of type [class java.util.Properties] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2016-10-11 21:53:38.592 INFO 2102 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService 'taskScheduler'
2016-10-11 21:53:39.064 INFO 2102 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup
2016-10-11 21:53:39.077 INFO 2102 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase -2147483648
2016-10-11 21:53:39.078 INFO 2102 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {message-handler:mqttJavaApplication.mqttOutbound.serviceActivator} as a subscriber to the 'mqttOutboundChannel' channel
2016-10-11 21:53:39.078 INFO 2102 --- [ main] o.s.integration.channel.DirectChannel : Channel 'application.mqttOutboundChannel' has 1 subscriber(s).
2016-10-11 21:53:39.079 INFO 2102 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started mqttJavaApplication.mqttOutbound.serviceActivator
2016-10-11 21:53:39.079 INFO 2102 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0
2016-10-11 21:53:39.079 INFO 2102 --- [ main] ProxyFactoryBean$MethodInvocationGateway : started mqttJavaApplication$MyGateway
2016-10-11 21:53:39.079 INFO 2102 --- [ main] GatewayCompletableFutureProxyFactoryBean : started mqttJavaApplication$MyGateway
2016-10-11 21:53:39.080 INFO 2102 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2016-10-11 21:53:39.080 INFO 2102 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 1 subscriber(s).
2016-10-11 21:53:39.080 INFO 2102 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started _org.springframework.integration.errorLogger
2016-10-11 21:53:39.093 INFO 2102 --- [ main] d.a.mqttoutbound.MqttJavaApplication : Started MqttJavaApplication in 2.962 seconds (JVM running for 3.669)
Exception in thread "main" org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.MessagingException: Failed to connect; nested exception is Unable to connect to server (32103) - java.net.ConnectException: Connection refused
at org.springframework.integration.dispatcher.AbstractDispatcher.wrapExceptionIfNecessary(AbstractDispatcher.java:133)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:120)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:143)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:135)
at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:375)
at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:477)
at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:429)
at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:420)
at org.springframework.integration.gateway.GatewayCompletableFutureProxyFactoryBean.invoke(GatewayCompletableFutureProxyFactoryBean.java:65)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213)
at com.sun.proxy.$Proxy40.sendToMqtt(Unknown Source)
at de.afarber.mqttoutbound.MqttJavaApplication.main(MqttJavaApplication.java:27)
Caused by: org.springframework.messaging.MessagingException: Failed to connect; nested exception is Unable to connect to server (32103) - java.net.ConnectException: Connection refused
at org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler.checkConnection(MqttPahoMessageHandler.java:180)
at org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler.publish(MqttPahoMessageHandler.java:189)
at org.springframework.integration.mqtt.outbound.AbstractMqttMessageHandler.handleMessageInternal(AbstractMqttMessageHandler.java:150)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.config.annotation.ServiceActivatorAnnotationPostProcessor$ReplyProducingMessageHandlerWrapper.handleRequestMessage(ServiceActivatorAnnotationPostProcessor.java:98)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
... 19 more
Caused by: Unable to connect to server (32103) - java.net.ConnectException: Connection refused
at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:79)
at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:590)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:345)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:70)
... 2 more
2016-10-11 21:53:39.203 INFO 2102 --- [ Thread-1] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@35a50a4c: startup date [Tue Oct 11 21:53:36 CEST 2016]; root of context hierarchy
2016-10-11 21:53:39.207 INFO 2102 --- [ Thread-1] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase 0
2016-10-11 21:53:39.209 INFO 2102 --- [ Thread-1] ProxyFactoryBean$MethodInvocationGateway : stopped mqttJavaApplication$MyGateway
2016-10-11 21:53:39.210 INFO 2102 --- [ Thread-1] GatewayCompletableFutureProxyFactoryBean : stopped mqttJavaApplication$MyGateway
2016-10-11 21:53:39.210 INFO 2102 --- [ Thread-1] o.s.i.endpoint.EventDrivenConsumer : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2016-10-11 21:53:39.210 INFO 2102 --- [ Thread-1] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 0 subscriber(s).
2016-10-11 21:53:39.210 INFO 2102 --- [ Thread-1] o.s.i.endpoint.EventDrivenConsumer : stopped _org.springframework.integration.errorLogger
2016-10-11 21:53:39.210 INFO 2102 --- [ Thread-1] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase -2147483648
2016-10-11 21:53:39.210 INFO 2102 --- [ Thread-1] o.s.i.endpoint.EventDrivenConsumer : Removing {message-handler:mqttJavaApplication.mqttOutbound.serviceActivator} as a subscriber to the 'mqttOutboundChannel' channel
2016-10-11 21:53:39.210 INFO 2102 --- [ Thread-1] o.s.integration.channel.DirectChannel : Channel 'application.mqttOutboundChannel' has 0 subscriber(s).
2016-10-11 21:53:39.211 INFO 2102 --- [ Thread-1] o.s.i.endpoint.EventDrivenConsumer : stopped mqttJavaApplication.mqttOutbound.serviceActivator
2016-10-11 21:53:39.211 INFO 2102 --- [ Thread-1] o.s.j.e.a.AnnotationMBeanExporter : Unregistering JMX-exposed beans on shutdown
2016-10-11 21:53:39.212 INFO 2102 --- [ Thread-1] o.s.s.c.ThreadPoolTaskScheduler : Shutting down ExecutorService 'taskScheduler'
------------------------------------------------------------------------
BUILD FAILURE
MqttJavaApplication.java 文件中的失败代码复制如下:
@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToMqtt("foo"); // THROWS THE ABOVE EXCEPTION
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setServerURIs("tcp://localhost:1883");
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("testClient", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("testTopic");
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MyGateway {
void sendToMqtt(String data);
}
}
而且它似乎是一个 MQTT 客户端...
如何将 MQTT 服务器(代理)实现为 Java Spring 集成的 Bean,请问从哪里开始?
错误消息的顶部说:
java.net.ConnectException: Connection refused
这意味着您设置为代理的主机 (localhost) 不是 运行 代理或启用了阻止连接的防火墙。
防火墙不太可能阻止与本地主机的连接,因此您需要检查代理是否确实 运行。
如果您没有代理,那么您需要安装一个,最简单的可能是从这里提供的 Mosquitto:
https://mosquitto.org/download/
其他经纪人可用,可在此处找到列表:
Spring 集成不提供代理;它为 sending/receiving 消息提供客户端。
您可以在 spring boot main class Application.java.
中创建 bean MqttPahoMessageHandler
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { mqttServerUrl });
options.setUserName(mqttUsername);
options.setPassword(mqttPassword.toCharArray());
factory.setConnectionOptions(options);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MqttPahoMessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
messageHandler.setAsync(true);
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
那么bean会在应用程序运行时自动创建。
然后发布消息你可以实现你自己的服务层。
当我 运行 MQTT 的 Outbound Channel Adapter 示例时,它抛出错误:
Executing command line: /Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/bin/java -classpath /Users/afarber/src/spring-newbie/MqttOutbound/target/classes:/Users/afarber/.m2/repository/org/springframework/boot/spring-boot/1.4.1.RELEASE/spring-boot-1.4.1.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/boot/spring-boot-starter/1.4.1.RELEASE/spring-boot-starter-1.4.1.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/boot/spring-boot-autoconfigure/1.4.1.RELEASE/spring-boot-autoconfigure-1.4.1.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/boot/spring-boot-starter-logging/1.4.1.RELEASE/spring-boot-starter-logging-1.4.1.RELEASE.jar:/Users/afarber/.m2/repository/ch/qos/logback/logback-classic/1.1.7/logback-classic-1.1.7.jar:/Users/afarber/.m2/repository/ch/qos/logback/logback-core/1.1.7/logback-core-1.1.7.jar:/Users/afarber/.m2/repository/org/slf4j/jcl-over-slf4j/1.7.21/jcl-over-slf4j-1.7.21.jar:/Users/afarber/.m2/repository/org/slf4j/jul-to-slf4j/1.7.21/jul-to-slf4j-1.7.21.jar:/Users/afarber/.m2/repository/org/slf4j/log4j-over-slf4j/1.7.21/log4j-over-slf4j-1.7.21.jar:/Users/afarber/.m2/repository/org/yaml/snakeyaml/1.17/snakeyaml-1.17.jar:/Users/afarber/.m2/repository/org/slf4j/slf4j-api/1.7.16/slf4j-api-1.7.16.jar:/Users/afarber/.m2/repository/org/springframework/spring-context/4.3.2.RELEASE/spring-context-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-aop/4.3.2.RELEASE/spring-aop-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-beans/4.3.2.RELEASE/spring-beans-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-expression/4.3.2.RELEASE/spring-expression-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-core/4.3.2.RELEASE/spring-core-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/commons-logging/commons-logging/1.2/commons-logging-1.2.jar:/Users/afarber/.m2/repository/org/springframework/integration/spring-integration-core/4.3.2.RELEASE/spring-integration-core-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-messaging/4.3.3.RELEASE/spring-messaging-4.3.3.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-tx/4.3.3.RELEASE/spring-tx-4.3.3.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/retry/spring-retry/1.1.3.RELEASE/spring-retry-1.1.3.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/integration/spring-integration-mqtt/4.3.2.RELEASE/spring-integration-mqtt-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/eclipse/paho/org.eclipse.paho.client.mqttv3/1.0.2/org.eclipse.paho.client.mqttv3-1.0.2.jar de.afarber.mqttoutbound.MqttJavaApplication
. ____ _ __ _ _
/\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v1.4.1.RELEASE)
2016-10-11 21:53:36.811 INFO 2102 --- [ main] d.a.mqttoutbound.MqttJavaApplication : Starting MqttJavaApplication on mba.local with PID 2102 (/Users/afarber/src/spring-newbie/MqttOutbound/target/classes started by afarber in /Users/afarber/src/spring-newbie/MqttOutbound)
2016-10-11 21:53:36.816 INFO 2102 --- [ main] d.a.mqttoutbound.MqttJavaApplication : No active profile set, falling back to default profiles: default
2016-10-11 21:53:36.960 INFO 2102 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@35a50a4c: startup date [Tue Oct 11 21:53:36 CEST 2016]; root of context hierarchy
2016-10-11 21:53:37.724 INFO 2102 --- [ main] o.s.b.f.config.PropertiesFactoryBean : Loading properties file from URL [jar:file:/Users/afarber/.m2/repository/org/springframework/integration/spring-integration-core/4.3.2.RELEASE/spring-integration-core-4.3.2.RELEASE.jar!/META-INF/spring.integration.default.properties]
2016-10-11 21:53:37.729 INFO 2102 --- [ main] o.s.i.config.IntegrationRegistrar : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2016-10-11 21:53:37.933 INFO 2102 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2016-10-11 21:53:37.947 INFO 2102 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2016-10-11 21:53:38.143 INFO 2102 --- [ main] o.s.b.f.config.PropertiesFactoryBean : Loading properties file from URL [jar:file:/Users/afarber/.m2/repository/org/springframework/integration/spring-integration-core/4.3.2.RELEASE/spring-integration-core-4.3.2.RELEASE.jar!/META-INF/spring.integration.default.properties]
2016-10-11 21:53:38.148 INFO 2102 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationGlobalProperties' of type [class org.springframework.beans.factory.config.PropertiesFactoryBean] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2016-10-11 21:53:38.177 INFO 2102 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationGlobalProperties' of type [class java.util.Properties] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2016-10-11 21:53:38.592 INFO 2102 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService 'taskScheduler'
2016-10-11 21:53:39.064 INFO 2102 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup
2016-10-11 21:53:39.077 INFO 2102 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase -2147483648
2016-10-11 21:53:39.078 INFO 2102 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {message-handler:mqttJavaApplication.mqttOutbound.serviceActivator} as a subscriber to the 'mqttOutboundChannel' channel
2016-10-11 21:53:39.078 INFO 2102 --- [ main] o.s.integration.channel.DirectChannel : Channel 'application.mqttOutboundChannel' has 1 subscriber(s).
2016-10-11 21:53:39.079 INFO 2102 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started mqttJavaApplication.mqttOutbound.serviceActivator
2016-10-11 21:53:39.079 INFO 2102 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0
2016-10-11 21:53:39.079 INFO 2102 --- [ main] ProxyFactoryBean$MethodInvocationGateway : started mqttJavaApplication$MyGateway
2016-10-11 21:53:39.079 INFO 2102 --- [ main] GatewayCompletableFutureProxyFactoryBean : started mqttJavaApplication$MyGateway
2016-10-11 21:53:39.080 INFO 2102 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2016-10-11 21:53:39.080 INFO 2102 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 1 subscriber(s).
2016-10-11 21:53:39.080 INFO 2102 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started _org.springframework.integration.errorLogger
2016-10-11 21:53:39.093 INFO 2102 --- [ main] d.a.mqttoutbound.MqttJavaApplication : Started MqttJavaApplication in 2.962 seconds (JVM running for 3.669)
Exception in thread "main" org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.MessagingException: Failed to connect; nested exception is Unable to connect to server (32103) - java.net.ConnectException: Connection refused
at org.springframework.integration.dispatcher.AbstractDispatcher.wrapExceptionIfNecessary(AbstractDispatcher.java:133)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:120)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:143)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:135)
at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:375)
at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:477)
at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:429)
at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:420)
at org.springframework.integration.gateway.GatewayCompletableFutureProxyFactoryBean.invoke(GatewayCompletableFutureProxyFactoryBean.java:65)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213)
at com.sun.proxy.$Proxy40.sendToMqtt(Unknown Source)
at de.afarber.mqttoutbound.MqttJavaApplication.main(MqttJavaApplication.java:27)
Caused by: org.springframework.messaging.MessagingException: Failed to connect; nested exception is Unable to connect to server (32103) - java.net.ConnectException: Connection refused
at org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler.checkConnection(MqttPahoMessageHandler.java:180)
at org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler.publish(MqttPahoMessageHandler.java:189)
at org.springframework.integration.mqtt.outbound.AbstractMqttMessageHandler.handleMessageInternal(AbstractMqttMessageHandler.java:150)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.config.annotation.ServiceActivatorAnnotationPostProcessor$ReplyProducingMessageHandlerWrapper.handleRequestMessage(ServiceActivatorAnnotationPostProcessor.java:98)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
... 19 more
Caused by: Unable to connect to server (32103) - java.net.ConnectException: Connection refused
at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:79)
at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:590)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:345)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:70)
... 2 more
2016-10-11 21:53:39.203 INFO 2102 --- [ Thread-1] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@35a50a4c: startup date [Tue Oct 11 21:53:36 CEST 2016]; root of context hierarchy
2016-10-11 21:53:39.207 INFO 2102 --- [ Thread-1] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase 0
2016-10-11 21:53:39.209 INFO 2102 --- [ Thread-1] ProxyFactoryBean$MethodInvocationGateway : stopped mqttJavaApplication$MyGateway
2016-10-11 21:53:39.210 INFO 2102 --- [ Thread-1] GatewayCompletableFutureProxyFactoryBean : stopped mqttJavaApplication$MyGateway
2016-10-11 21:53:39.210 INFO 2102 --- [ Thread-1] o.s.i.endpoint.EventDrivenConsumer : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2016-10-11 21:53:39.210 INFO 2102 --- [ Thread-1] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 0 subscriber(s).
2016-10-11 21:53:39.210 INFO 2102 --- [ Thread-1] o.s.i.endpoint.EventDrivenConsumer : stopped _org.springframework.integration.errorLogger
2016-10-11 21:53:39.210 INFO 2102 --- [ Thread-1] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase -2147483648
2016-10-11 21:53:39.210 INFO 2102 --- [ Thread-1] o.s.i.endpoint.EventDrivenConsumer : Removing {message-handler:mqttJavaApplication.mqttOutbound.serviceActivator} as a subscriber to the 'mqttOutboundChannel' channel
2016-10-11 21:53:39.210 INFO 2102 --- [ Thread-1] o.s.integration.channel.DirectChannel : Channel 'application.mqttOutboundChannel' has 0 subscriber(s).
2016-10-11 21:53:39.211 INFO 2102 --- [ Thread-1] o.s.i.endpoint.EventDrivenConsumer : stopped mqttJavaApplication.mqttOutbound.serviceActivator
2016-10-11 21:53:39.211 INFO 2102 --- [ Thread-1] o.s.j.e.a.AnnotationMBeanExporter : Unregistering JMX-exposed beans on shutdown
2016-10-11 21:53:39.212 INFO 2102 --- [ Thread-1] o.s.s.c.ThreadPoolTaskScheduler : Shutting down ExecutorService 'taskScheduler'
------------------------------------------------------------------------
BUILD FAILURE
MqttJavaApplication.java 文件中的失败代码复制如下:
@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToMqtt("foo"); // THROWS THE ABOVE EXCEPTION
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setServerURIs("tcp://localhost:1883");
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("testClient", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("testTopic");
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MyGateway {
void sendToMqtt(String data);
}
}
而且它似乎是一个 MQTT 客户端...
如何将 MQTT 服务器(代理)实现为 Java Spring 集成的 Bean,请问从哪里开始?
错误消息的顶部说:
java.net.ConnectException: Connection refused
这意味着您设置为代理的主机 (localhost) 不是 运行 代理或启用了阻止连接的防火墙。
防火墙不太可能阻止与本地主机的连接,因此您需要检查代理是否确实 运行。
如果您没有代理,那么您需要安装一个,最简单的可能是从这里提供的 Mosquitto:
https://mosquitto.org/download/
其他经纪人可用,可在此处找到列表:
Spring 集成不提供代理;它为 sending/receiving 消息提供客户端。
您可以在 spring boot main class Application.java.
中创建 bean MqttPahoMessageHandler@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { mqttServerUrl });
options.setUserName(mqttUsername);
options.setPassword(mqttPassword.toCharArray());
factory.setConnectionOptions(options);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MqttPahoMessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
messageHandler.setAsync(true);
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
那么bean会在应用程序运行时自动创建。
然后发布消息你可以实现你自己的服务层。