Spring 集成MQTT发布订阅多个主题
Spring integration MQTT publish & subscribe to multiple topics
我正在尝试构建一个订阅多个 mqtt 主题的应用程序,获取信息,处理它并形成 xmls,并在处理时触发一个事件,以便将这些发送到某个云服务器并从那里成功响应将发送回 mqtt 通道。
<int-mqtt:message-driven-channel-adapter
id="mqttAdapter" client-id="${clientId}" url="${brokerUrl}" topics="${topics}"
channel="startCase" auto-startup="true" />
<int:channel id="startCase" />
<int:service-activator id="startCaseService"
input-channel="startCase" ref="msgPollingService" method="pollMessages" />
<bean id="mqttTaskExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="5" />
<property name="maxPoolSize" value="10" />
</bean>
<bean id="msgPollingService" class="com.xxxx.xxx.mqttclient.mqtt.MsgPollingService">
<property name="taskExecutor" ref="mqttTaskExecutor" />
<property name="vendorId" value="${vendorId}" />
</bean>
我的问题是如何将其发布到多个渠道,即我是否可以选择将 X 消息发布到 Y 主题。目前我有以下:
<int:channel id="outbound" />
<int-mqtt:outbound-channel-adapter
id="mqtt-publish" client-id="kj" client-factory="clientFactory"
auto-startup="true" url="${brokerUrl}" default-qos="0"
default-retained="true" default-topic="${responseTopic}" channel="outbound" />
<bean id="eventListner" class="com.xxxx.xxxx.mqttclient.event.EventListener">
<property name="sccUrl" value="${url}" />
<property name="restTemplate" ref="restTemplate" />
<property name="channel" ref="outbound" />
</bean>
我可以这样发布:
channel.send(MessageBuilder.withPayload("customResponse").build());
我可以这样做吗:
channel.send(Message<?>, topic)
你的配置看起来不错。然而 MessageChannel
是 loosely-coupling 的抽象,并且只与 Message
打交道。
因此,您要求 a-la channel.send(Message<?>, topic)
对于消息传递概念不正确。
不过,我们为您准备了一个技巧。来自 AbstractMqttMessageHandler
:
String topic = (String) message.getHeaders().get(MqttHeaders.TOPIC);
.....
this.publish(topic == null ? this.defaultTopic : topic, mqttMessage, message);
那么,您需要从代码中得到的是:
channel.send(MessageBuilder.withPayload("customResponse").setHeader(MqttHeaders.TOPIC, topic).build());
换句话说,您应该发送 Message
和 mqtt_topic
header 以实现来自 <int-mqtt:outbound-channel-adapter>
的动态发布。
从另一方面来说,我们不建议直接从应用程序中使用 MessageChannel
s。 <gateway>
with service interface 是针对 end-application 的这种情况。 topic
可以是标记为 @Header(MqttHeaders.TOPIC)
的服务方法参数之一
我正在尝试构建一个订阅多个 mqtt 主题的应用程序,获取信息,处理它并形成 xmls,并在处理时触发一个事件,以便将这些发送到某个云服务器并从那里成功响应将发送回 mqtt 通道。
<int-mqtt:message-driven-channel-adapter
id="mqttAdapter" client-id="${clientId}" url="${brokerUrl}" topics="${topics}"
channel="startCase" auto-startup="true" />
<int:channel id="startCase" />
<int:service-activator id="startCaseService"
input-channel="startCase" ref="msgPollingService" method="pollMessages" />
<bean id="mqttTaskExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="5" />
<property name="maxPoolSize" value="10" />
</bean>
<bean id="msgPollingService" class="com.xxxx.xxx.mqttclient.mqtt.MsgPollingService">
<property name="taskExecutor" ref="mqttTaskExecutor" />
<property name="vendorId" value="${vendorId}" />
</bean>
我的问题是如何将其发布到多个渠道,即我是否可以选择将 X 消息发布到 Y 主题。目前我有以下:
<int:channel id="outbound" />
<int-mqtt:outbound-channel-adapter
id="mqtt-publish" client-id="kj" client-factory="clientFactory"
auto-startup="true" url="${brokerUrl}" default-qos="0"
default-retained="true" default-topic="${responseTopic}" channel="outbound" />
<bean id="eventListner" class="com.xxxx.xxxx.mqttclient.event.EventListener">
<property name="sccUrl" value="${url}" />
<property name="restTemplate" ref="restTemplate" />
<property name="channel" ref="outbound" />
</bean>
我可以这样发布:
channel.send(MessageBuilder.withPayload("customResponse").build());
我可以这样做吗:
channel.send(Message<?>, topic)
你的配置看起来不错。然而 MessageChannel
是 loosely-coupling 的抽象,并且只与 Message
打交道。
因此,您要求 a-la channel.send(Message<?>, topic)
对于消息传递概念不正确。
不过,我们为您准备了一个技巧。来自 AbstractMqttMessageHandler
:
String topic = (String) message.getHeaders().get(MqttHeaders.TOPIC);
.....
this.publish(topic == null ? this.defaultTopic : topic, mqttMessage, message);
那么,您需要从代码中得到的是:
channel.send(MessageBuilder.withPayload("customResponse").setHeader(MqttHeaders.TOPIC, topic).build());
换句话说,您应该发送 Message
和 mqtt_topic
header 以实现来自 <int-mqtt:outbound-channel-adapter>
的动态发布。
从另一方面来说,我们不建议直接从应用程序中使用 MessageChannel
s。 <gateway>
with service interface 是针对 end-application 的这种情况。 topic
可以是标记为 @Header(MqttHeaders.TOPIC)