MQTT- Spring 集成

MQTT- Spring Integration

我正在尝试运行示例 MQTT-SPring integration 项目。我是如何修改一些配置细节以从属性文件中获取的。并尝试从 runMQTT.java 文件连接到消息代理。

代码片段如下

MQTT-Context.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:int-mqtt="http://www.springframework.org/schema/integration/mqtt"
    xsi:schemaLocation="
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.1.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
        http://www.springframework.org/schema/integration/mqtt http://www.springframework.org/schema/integration/mqtt/spring-integration-mqtt-4.1.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.1.xsd">


    <bean id="clientFactory"
        class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
        <property name="userName" value="username" />
        <property name="password" value="password" />
        <property name="serverURIs" value="${serveruri}"></property>
    </bean>

    <!-- intercept and log every message -->
    <int:logging-channel-adapter id="logger"
        level="ERROR" />
    <int:wire-tap channel="logger" />
    <!-- Mark the auto-startup="true" for starting MqttPahoMessageDrivenChannelAdapter from configuration  -->
    <int-mqtt:message-driven-channel-adapter
        id="startCaseAdapter" client-id="clientId" url="${mqttbrokerurl}"
        topics="topic" channel="startCase" auto-startup="true"  />
    <int:channel id="startCase" />


    <int:service-activator id="startCaseService"
        input-channel="startCase" ref="mqttCaseService" method="startCase" />

    <bean id="mqttCaseService" class="com.XXX.integration.ieg.mqtt.MqttCaseService" />
</beans>

properties(mqttbrokerurl, serveruri)从根上下文加载

运行MQTT

package com.XXX.integration.ieg.mqtt;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;

public class RunMqtt {

    public void test() throws MqttException{
        ApplicationContext ac = new ClassPathXmlApplicationContext("/META-INF/spring/integration/mqtt/mqtt-context.xml");
        System.out.println(ac);
        MqttPahoMessageDrivenChannelAdapter startCaseAdapter = (MqttPahoMessageDrivenChannelAdapter)ac.getBean("startCaseAdapter");
        //Uncomment to stop the adapter manually from program
        //startCaseAdapter.start();
        //DefaultMqttPahoClientFactory mqttClient = (DefaultMqttPahoClientFactory)ac.getBean("clientFactory");
        DefaultMqttPahoClientFactory mqttClient = new DefaultMqttPahoClientFactory();

        MqttClient mclient = mqttClient.getClientInstance("tcp://localhost:1883", "JavaSample");
        String data = "This is what I am sending in 2nd attempt";
        MqttMessage mm = new MqttMessage(data.getBytes());
        mm.setQos(1);
        mclient.connect();
        mclient.publish("topic",mm);
        mclient.disconnect();
        //Uncomment to stop the adapter manually from program
        //startCaseAdapter.stop();
    }


    public static void main(String[] args) throws MqttException {
        new RunMqtt().test();

    }

    /*public static void main1(String[] args) {

        String topic        = "MQTT Examples";
        String content      = "Message from MqttPublishSample";
        int qos             = 2;
        String broker       = "tcp://localhost:1883";
        String clientId     = "JavaSample";
        MemoryPersistence persistence = new MemoryPersistence();

        try {
            MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);
            System.out.println("Connecting to broker: "+broker);
            sampleClient.connect(connOpts);
            System.out.println("Connected");
            System.out.println("Publishing message: "+content);
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            sampleClient.publish(topic, message);
            System.out.println("Message published");
            sampleClient.disconnect();
            System.out.println("Disconnected");
            System.exit(0);
        } catch(MqttException me) {
            System.out.println("reason "+me.getReasonCode());
            System.out.println("msg "+me.getMessage());
            System.out.println("loc "+me.getLocalizedMessage());
            System.out.println("cause "+me.getCause());
            System.out.println("excep "+me);
            me.printStackTrace();
        }
    }*/

}

根上下文 根上下文如下

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="
           http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/context
           http://www.springframework.org/schema/context/spring-context.xsd">
     <!-- This file will be the root context file for web app. All other context will be imported here -->
     <!-- Security context . Now the spring security with basic authentication implemented. OAuth2 will be implemented -->
    <import resource="security-config.xml" />
    <!-- rest service call context -->
    <import resource="classpath:META-INF/spring/integration/rest/applicationContext-http-int.xml"/>
    <!--  Sftp context-->
    <import resource="classpath:META-INF/spring/integration/sftp/SftpInboundReceive-context.xml"/>
    <import resource="classpath:META-INF/spring/integration/sftp/SftpOutboundTransfer-context.xml"/>
    <import resource="classpath:META-INF/spring/integration/sftp/SftpOutboundTransfer-poll.xml"/>
    <!--  mqtt context-->
    <import resource="classpath:META-INF/spring/integration/mqtt/mqtt-context.xml"/>


    <!--Component scan base package  -->
    <context:component-scan base-package="com.XXX.integration.ieg"/>
    <!-- All the property configuration moved to parent context file to solve the propert not found exception -->
        <context:property-placeholder order="1"
        location="classpath:/sftpuser.properties, classpath:/sftpfile.properties,classpath:/resthttp.properties, classpath:/mqtt.properties" ignore-unresolvable="true"/>

</beans>

在这种情况下,mqtt 或 sftp 或 resthttp 属性加载失败。请帮忙解决

堆栈跟踪

java.lang.IllegalArgumentException: ${mqttbrokerurl}
    at org.eclipse.paho.client.mqttv3.MqttConnectOptions.validateURI(MqttConnectOptions.java:448)
    at org.eclipse.paho.client.mqttv3.MqttAsyncClient.<init>(MqttAsyncClient.java:260)
    at org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory.getAsyncClientInstance(DefaultMqttPahoClientFactory.java:119)
    at org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter.connectAndSubscribe(MqttPahoMessageDrivenChannelAdapter.java:189)
    at org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter.doStart(MqttPahoMessageDrivenChannelAdapter.java:110)
    at org.springframework.integration.endpoint.AbstractEndpoint.start(AbstractEndpoint.java:94)
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:173)
    at org.springframework.context.support.DefaultLifecycleProcessor.access0(DefaultLifecycleProcessor.java:51)
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:346)
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:149)
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:112)
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:770)
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:483)
    at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:139)
    at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:83)
    at com.XXX.integration.ieg.mqtt.RunMqtt.test(RunMqtt.java:18)
    at com.XXX.integration.ieg.mqtt.RunMqtt.main(RunMqtt.java:39)

属性文件 mqtt.properties mqttbrokerurl=tcp://172.18.128.150:1883

根据 StackTrace,您的 属性-占位符未解析,那只是因为您的属性文件没有 <context:property-placeholder>

就算你说是:

The properties(mqttbrokerurl, serveruri) are loaded from root context

我在你的代码中没有看到这些东西:

  1. 你只显示MQTT-Context.xml

  2. 您的 RunMqtt 仅启动该上下文:

    new ClassPathXmlApplicationContext("/META-INF/spring/integration/mqtt/mqtt-context.xml");
    

所以,请解决这些问题,并返回给我们进一步的进展。

参见 。当 运行 在 Web 应用程序中时,类路径是相对于 Web 上下文的。