MQTT- Spring 集成
MQTT- Spring Integration
我正在尝试运行示例 MQTT-SPring integration
项目。我是如何修改一些配置细节以从属性文件中获取的。并尝试从 runMQTT.java
文件连接到消息代理。
代码片段如下
MQTT-Context.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:int-mqtt="http://www.springframework.org/schema/integration/mqtt"
xsi:schemaLocation="
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.1.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
http://www.springframework.org/schema/integration/mqtt http://www.springframework.org/schema/integration/mqtt/spring-integration-mqtt-4.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.1.xsd">
<bean id="clientFactory"
class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
<property name="userName" value="username" />
<property name="password" value="password" />
<property name="serverURIs" value="${serveruri}"></property>
</bean>
<!-- intercept and log every message -->
<int:logging-channel-adapter id="logger"
level="ERROR" />
<int:wire-tap channel="logger" />
<!-- Mark the auto-startup="true" for starting MqttPahoMessageDrivenChannelAdapter from configuration -->
<int-mqtt:message-driven-channel-adapter
id="startCaseAdapter" client-id="clientId" url="${mqttbrokerurl}"
topics="topic" channel="startCase" auto-startup="true" />
<int:channel id="startCase" />
<int:service-activator id="startCaseService"
input-channel="startCase" ref="mqttCaseService" method="startCase" />
<bean id="mqttCaseService" class="com.XXX.integration.ieg.mqtt.MqttCaseService" />
</beans>
properties(mqttbrokerurl, serveruri)
从根上下文加载
运行MQTT
package com.XXX.integration.ieg.mqtt;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
public class RunMqtt {
public void test() throws MqttException{
ApplicationContext ac = new ClassPathXmlApplicationContext("/META-INF/spring/integration/mqtt/mqtt-context.xml");
System.out.println(ac);
MqttPahoMessageDrivenChannelAdapter startCaseAdapter = (MqttPahoMessageDrivenChannelAdapter)ac.getBean("startCaseAdapter");
//Uncomment to stop the adapter manually from program
//startCaseAdapter.start();
//DefaultMqttPahoClientFactory mqttClient = (DefaultMqttPahoClientFactory)ac.getBean("clientFactory");
DefaultMqttPahoClientFactory mqttClient = new DefaultMqttPahoClientFactory();
MqttClient mclient = mqttClient.getClientInstance("tcp://localhost:1883", "JavaSample");
String data = "This is what I am sending in 2nd attempt";
MqttMessage mm = new MqttMessage(data.getBytes());
mm.setQos(1);
mclient.connect();
mclient.publish("topic",mm);
mclient.disconnect();
//Uncomment to stop the adapter manually from program
//startCaseAdapter.stop();
}
public static void main(String[] args) throws MqttException {
new RunMqtt().test();
}
/*public static void main1(String[] args) {
String topic = "MQTT Examples";
String content = "Message from MqttPublishSample";
int qos = 2;
String broker = "tcp://localhost:1883";
String clientId = "JavaSample";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
System.out.println("Connecting to broker: "+broker);
sampleClient.connect(connOpts);
System.out.println("Connected");
System.out.println("Publishing message: "+content);
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
sampleClient.publish(topic, message);
System.out.println("Message published");
sampleClient.disconnect();
System.out.println("Disconnected");
System.exit(0);
} catch(MqttException me) {
System.out.println("reason "+me.getReasonCode());
System.out.println("msg "+me.getMessage());
System.out.println("loc "+me.getLocalizedMessage());
System.out.println("cause "+me.getCause());
System.out.println("excep "+me);
me.printStackTrace();
}
}*/
}
根上下文
根上下文如下
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<!-- This file will be the root context file for web app. All other context will be imported here -->
<!-- Security context . Now the spring security with basic authentication implemented. OAuth2 will be implemented -->
<import resource="security-config.xml" />
<!-- rest service call context -->
<import resource="classpath:META-INF/spring/integration/rest/applicationContext-http-int.xml"/>
<!-- Sftp context-->
<import resource="classpath:META-INF/spring/integration/sftp/SftpInboundReceive-context.xml"/>
<import resource="classpath:META-INF/spring/integration/sftp/SftpOutboundTransfer-context.xml"/>
<import resource="classpath:META-INF/spring/integration/sftp/SftpOutboundTransfer-poll.xml"/>
<!-- mqtt context-->
<import resource="classpath:META-INF/spring/integration/mqtt/mqtt-context.xml"/>
<!--Component scan base package -->
<context:component-scan base-package="com.XXX.integration.ieg"/>
<!-- All the property configuration moved to parent context file to solve the propert not found exception -->
<context:property-placeholder order="1"
location="classpath:/sftpuser.properties, classpath:/sftpfile.properties,classpath:/resthttp.properties, classpath:/mqtt.properties" ignore-unresolvable="true"/>
</beans>
在这种情况下,mqtt 或 sftp 或 resthttp 属性加载失败。请帮忙解决
堆栈跟踪
java.lang.IllegalArgumentException: ${mqttbrokerurl}
at org.eclipse.paho.client.mqttv3.MqttConnectOptions.validateURI(MqttConnectOptions.java:448)
at org.eclipse.paho.client.mqttv3.MqttAsyncClient.<init>(MqttAsyncClient.java:260)
at org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory.getAsyncClientInstance(DefaultMqttPahoClientFactory.java:119)
at org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter.connectAndSubscribe(MqttPahoMessageDrivenChannelAdapter.java:189)
at org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter.doStart(MqttPahoMessageDrivenChannelAdapter.java:110)
at org.springframework.integration.endpoint.AbstractEndpoint.start(AbstractEndpoint.java:94)
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:173)
at org.springframework.context.support.DefaultLifecycleProcessor.access0(DefaultLifecycleProcessor.java:51)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:346)
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:149)
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:112)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:770)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:483)
at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:139)
at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:83)
at com.XXX.integration.ieg.mqtt.RunMqtt.test(RunMqtt.java:18)
at com.XXX.integration.ieg.mqtt.RunMqtt.main(RunMqtt.java:39)
属性文件
mqtt.properties
mqttbrokerurl=tcp://172.18.128.150:1883
根据 StackTrace,您的 属性-占位符未解析,那只是因为您的属性文件没有 <context:property-placeholder>
。
就算你说是:
The properties(mqttbrokerurl, serveruri) are loaded from root context
我在你的代码中没有看到这些东西:
你只显示MQTT-Context.xml
您的 RunMqtt
仅启动该上下文:
new ClassPathXmlApplicationContext("/META-INF/spring/integration/mqtt/mqtt-context.xml");
所以,请解决这些问题,并返回给我们进一步的进展。
参见 。当 运行 在 Web 应用程序中时,类路径是相对于 Web 上下文的。
我正在尝试运行示例 MQTT-SPring integration
项目。我是如何修改一些配置细节以从属性文件中获取的。并尝试从 runMQTT.java
文件连接到消息代理。
代码片段如下
MQTT-Context.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:int-mqtt="http://www.springframework.org/schema/integration/mqtt"
xsi:schemaLocation="
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.1.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
http://www.springframework.org/schema/integration/mqtt http://www.springframework.org/schema/integration/mqtt/spring-integration-mqtt-4.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.1.xsd">
<bean id="clientFactory"
class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
<property name="userName" value="username" />
<property name="password" value="password" />
<property name="serverURIs" value="${serveruri}"></property>
</bean>
<!-- intercept and log every message -->
<int:logging-channel-adapter id="logger"
level="ERROR" />
<int:wire-tap channel="logger" />
<!-- Mark the auto-startup="true" for starting MqttPahoMessageDrivenChannelAdapter from configuration -->
<int-mqtt:message-driven-channel-adapter
id="startCaseAdapter" client-id="clientId" url="${mqttbrokerurl}"
topics="topic" channel="startCase" auto-startup="true" />
<int:channel id="startCase" />
<int:service-activator id="startCaseService"
input-channel="startCase" ref="mqttCaseService" method="startCase" />
<bean id="mqttCaseService" class="com.XXX.integration.ieg.mqtt.MqttCaseService" />
</beans>
properties(mqttbrokerurl, serveruri)
从根上下文加载
运行MQTT
package com.XXX.integration.ieg.mqtt;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
public class RunMqtt {
public void test() throws MqttException{
ApplicationContext ac = new ClassPathXmlApplicationContext("/META-INF/spring/integration/mqtt/mqtt-context.xml");
System.out.println(ac);
MqttPahoMessageDrivenChannelAdapter startCaseAdapter = (MqttPahoMessageDrivenChannelAdapter)ac.getBean("startCaseAdapter");
//Uncomment to stop the adapter manually from program
//startCaseAdapter.start();
//DefaultMqttPahoClientFactory mqttClient = (DefaultMqttPahoClientFactory)ac.getBean("clientFactory");
DefaultMqttPahoClientFactory mqttClient = new DefaultMqttPahoClientFactory();
MqttClient mclient = mqttClient.getClientInstance("tcp://localhost:1883", "JavaSample");
String data = "This is what I am sending in 2nd attempt";
MqttMessage mm = new MqttMessage(data.getBytes());
mm.setQos(1);
mclient.connect();
mclient.publish("topic",mm);
mclient.disconnect();
//Uncomment to stop the adapter manually from program
//startCaseAdapter.stop();
}
public static void main(String[] args) throws MqttException {
new RunMqtt().test();
}
/*public static void main1(String[] args) {
String topic = "MQTT Examples";
String content = "Message from MqttPublishSample";
int qos = 2;
String broker = "tcp://localhost:1883";
String clientId = "JavaSample";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
System.out.println("Connecting to broker: "+broker);
sampleClient.connect(connOpts);
System.out.println("Connected");
System.out.println("Publishing message: "+content);
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
sampleClient.publish(topic, message);
System.out.println("Message published");
sampleClient.disconnect();
System.out.println("Disconnected");
System.exit(0);
} catch(MqttException me) {
System.out.println("reason "+me.getReasonCode());
System.out.println("msg "+me.getMessage());
System.out.println("loc "+me.getLocalizedMessage());
System.out.println("cause "+me.getCause());
System.out.println("excep "+me);
me.printStackTrace();
}
}*/
}
根上下文 根上下文如下
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<!-- This file will be the root context file for web app. All other context will be imported here -->
<!-- Security context . Now the spring security with basic authentication implemented. OAuth2 will be implemented -->
<import resource="security-config.xml" />
<!-- rest service call context -->
<import resource="classpath:META-INF/spring/integration/rest/applicationContext-http-int.xml"/>
<!-- Sftp context-->
<import resource="classpath:META-INF/spring/integration/sftp/SftpInboundReceive-context.xml"/>
<import resource="classpath:META-INF/spring/integration/sftp/SftpOutboundTransfer-context.xml"/>
<import resource="classpath:META-INF/spring/integration/sftp/SftpOutboundTransfer-poll.xml"/>
<!-- mqtt context-->
<import resource="classpath:META-INF/spring/integration/mqtt/mqtt-context.xml"/>
<!--Component scan base package -->
<context:component-scan base-package="com.XXX.integration.ieg"/>
<!-- All the property configuration moved to parent context file to solve the propert not found exception -->
<context:property-placeholder order="1"
location="classpath:/sftpuser.properties, classpath:/sftpfile.properties,classpath:/resthttp.properties, classpath:/mqtt.properties" ignore-unresolvable="true"/>
</beans>
在这种情况下,mqtt 或 sftp 或 resthttp 属性加载失败。请帮忙解决
堆栈跟踪
java.lang.IllegalArgumentException: ${mqttbrokerurl}
at org.eclipse.paho.client.mqttv3.MqttConnectOptions.validateURI(MqttConnectOptions.java:448)
at org.eclipse.paho.client.mqttv3.MqttAsyncClient.<init>(MqttAsyncClient.java:260)
at org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory.getAsyncClientInstance(DefaultMqttPahoClientFactory.java:119)
at org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter.connectAndSubscribe(MqttPahoMessageDrivenChannelAdapter.java:189)
at org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter.doStart(MqttPahoMessageDrivenChannelAdapter.java:110)
at org.springframework.integration.endpoint.AbstractEndpoint.start(AbstractEndpoint.java:94)
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:173)
at org.springframework.context.support.DefaultLifecycleProcessor.access0(DefaultLifecycleProcessor.java:51)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:346)
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:149)
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:112)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:770)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:483)
at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:139)
at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:83)
at com.XXX.integration.ieg.mqtt.RunMqtt.test(RunMqtt.java:18)
at com.XXX.integration.ieg.mqtt.RunMqtt.main(RunMqtt.java:39)
属性文件 mqtt.properties mqttbrokerurl=tcp://172.18.128.150:1883
根据 StackTrace,您的 属性-占位符未解析,那只是因为您的属性文件没有 <context:property-placeholder>
。
就算你说是:
The properties(mqttbrokerurl, serveruri) are loaded from root context
我在你的代码中没有看到这些东西:
你只显示
MQTT-Context.xml
您的
RunMqtt
仅启动该上下文:new ClassPathXmlApplicationContext("/META-INF/spring/integration/mqtt/mqtt-context.xml");
所以,请解决这些问题,并返回给我们进一步的进展。
参见