Spring MVC + Mosquitto + MQTT 集成无法获取任何消息

Spring MVC + Mosquitto + MQTT Integration can't get any message

使用 Spring 的集成库,我正在尝试连接到 mosquitto 和 read/send 消息...但是有些事情我无法弄清楚。

1 - 初始化应用程序时,应用程序连接到 mosquitto,但 mosquitto 在几秒钟内再次从具有相同 ID 的同一个应用程序收到数百个连接请求。这是日志的例子:

New connection from 127.0.0.1 on port 1555.
Client springClient already connected, closing old connection.
Client springClient disconnected.
New client connected from 127.0.0.1 as springClient (c1, k60).
Sending CONNACK to springClient (0, 0)
Received SUBSCRIBE from springClient
    0001/001/INF (QoS 1)
springClient 1 0001/001/INF
Sending SUBACK to springClient
New connection from 127.0.0.1 on port 1555.
Client springClient already connected, closing old connection.
Client springClient disconnected.

2 - 使用此配置我无法从 mosquitto 收到任何消息:

Spring XML :

<!-- This is for reading messages -->
<bean id="mqttInbound" class="com.mobistech.drc.m2mproject.mqtt.MqttCustomInboundAdapter">
    <beans:constructor-arg name="clientId" value="springClient" />
    <beans:constructor-arg name="clientFactory" ref="clientFactory" />
    <beans:constructor-arg name="topic" value="0001/001/INF" />
    <beans:property name="autoStartup" value="true"></beans:property>
    <beans:property name="outputChannel" ref="fromBrokerChannel"></beans:property>
</bean>

 <int:channel  id="fromBrokerChannel" />

自定义适配器:

public class MqttCustomInboundAdapter extends MqttPahoMessageDrivenChannelAdapter {

    public MqttCustomInboundAdapter(String clientId,
            MqttPahoClientFactory clientFactory, String[] topic) {
        super(clientId, clientFactory, topic);
        // TODO Auto-generated constructor stub
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception
    {
        super.messageArrived(topic, message);
        System.out.println("**************** Message from topic : " + topic);
        System.out.println("**************** Message : " + new String(message.getPayload()));
    }

    public void addTopicIfNotExists(String topic)
    {
        for(String topicName:getTopic())
        {
            if(topicName.equals(topic))return;
        }

        addTopic(topic);

        System.out.println("************* Added Topic : " + topic);

        for(String topicName:getTopic())
        {
            System.out.println(topicName);
        }
    }
}

我没有使用 service-activator,因为我需要知道消息是从哪个主题发送的,所以我包装了 MqttPahoMessageDrivenChannelAdapter,正如它在 Spring Integration Docs[=16 中提到的那样=]

那么有什么建议吗?

我找到了。在做了一些研究之后,我决定使用 service-activator 来激活服务(这是显而易见的)。之后我可以收到消息。

关于蚊子的奇怪行为,我想通了,这与蚊子无关。当 MqttCustomInboundAdapter 的 autoStartup 属性 设置为 true 时,应用程序发送过多的连接请求。这就是 mosquitto 收到此连接请求并尝试将它们一一连接导致旧连接断开连接的原因。

新 XML 看起来像这样:

 <bean id="mqttInbound" class="com.mobistech.drc.m2mproject.mqtt.MqttCustomInboundAdapter">
    <beans:constructor-arg name="clientId" value="springClient" />
    <beans:constructor-arg name="clientFactory" ref="clientFactory" />
    <beans:constructor-arg name="topic" value="0001/001/INF" />
    <beans:property name="autoStartup" value="false"></beans:property>
    <beans:property name="outputChannel" ref="fromBrokerChannel"></beans:property>
    <beans:property name="converter" ref="mqttMessageConverter"></beans:property>
</bean>

 <int:channel id="fromBrokerChannel" />
<int:service-activator input-channel="fromBrokerChannel" ref="mqttServiceActivator" ></int:service-activator>

现在我可以收到消息了...

我设法使用 java 配置

配置 mqtt
@Bean
public MqttPahoMessageDrivenChannelAdapter mqttInbound() {

    MqttPahoMessageDrivenChannelAdapter mqtt = new MqttPahoMessageDrivenChannelAdapter( applicationName + "-sub", clientFactory( ), "/#" );
    mqtt.setQos( 2 );
    mqtt.setOutputChannel( outbount( ) );
    mqtt.setAutoStartup( true );
    mqtt.setTaskScheduler( taskScheduler( ) );

    return mqtt;
}

@Bean
public MqttPahoMessageHandler mqqtMessageHandler() {

    return new MqttPahoMessageHandler( applicationName + "-pub", clientFactory( ) );
}

@Bean
public DefaultMqttPahoClientFactory clientFactory() {

    DefaultMqttPahoClientFactory clientFactory = new DefaultMqttPahoClientFactory( );
    clientFactory.setUserName( "test" );
    clientFactory.setPassword( "test" );
    clientFactory.setServerURIs( new String[] { "tcp://url:1883" } );
    return clientFactory;
}

@Bean
public PublishSubscribeChannel outbount() {

    PublishSubscribeChannel psc = new PublishSubscribeChannel( );
    psc.subscribe( new MessageHandler( ) {

        @Override
        public void handleMessage( Message<?> message ) throws MessagingException {

            logger.warn( message );

        }
    } );

    return psc;
}

要发送消息,请添加以下内容:

@Autowired
MqttPahoMessageHandler mqtt;

@RequestMapping( "/" )
public ModelAndView getHomePage() throws MqttPersistenceException, MqttException {

    Message<String> message = MessageBuilder.withPayload( "spring - test" ).setHeader( MqttHeaders.TOPIC, "/topic" ).build( );

    mqtt.handleMessage( message );

    return new ModelAndView( "home" );
}