Spring Mqtt 集成:出站主题问题
Spring Mqtt integration: outbound topic issue
您好,我正在尝试使用 spring 集成来接收 MQTT 消息,处理它们并发布到另一个主题。
这里是 integration.xml:
<int-mqtt:outbound-channel-adapter id="mqtt-publish"
client-id="spring-foo-1"
client-factory="clientFactory"
auto-startup="true"
url="tcp://localhost:1883"
default-qos="0"
default-retained="true"
default-topic="tweets/akki" />
<int-mqtt:message-driven-channel-adapter id="oneTopicAdapter"
client-id="spring-foo-2"
client-factory="clientFactory"
auto-startup="true"
url="tcp://localhost:1883"
topics="mqtt/publish"
/>
<int:service-activator input-channel="oneTopicAdapter" method="logMessages" ref="mqttLogger" output-channel="mqtt-publish"></int:service-activator>
<bean id="mqttLogger" class="hello.mqttReceiver" />
和mqttReceiver.java:
package hello;
public class mqttReceiver {
public String logMessages(String a){
String processed_data = a; //TODO Process Data
return processed_data;
}
}
以下是我面临的问题:
processed_data
被路由到 mqtt/publish 而不是 mqtt/akki
processed_data
不是发表过多次
这是正确的,因为 AbstractMqttMessageHandler
首先查看 headers
:
String topic = (String) message.getHeaders().get(MqttHeaders.TOPIC);
Object mqttMessage = this.converter.fromMessage(message, Object.class);
if (topic == null && this.defaultTopic == null) {
throw new MessageHandlingException(message,
"No '" + MqttHeaders.TOPIC + "' header and no default topic defined");
}
当 DefaultPahoMessageConverter
在消息到达时从 MqttPahoMessageDrivenChannelAdapter
填充 MqttHeaders.TOPIC
header。
在向 <int-mqtt:outbound-channel-adapter>
发送消息之前,您应该考虑使用 <int:header-filter header-names="mqtt_topic"/>
您好,我正在尝试使用 spring 集成来接收 MQTT 消息,处理它们并发布到另一个主题。
这里是 integration.xml:
<int-mqtt:outbound-channel-adapter id="mqtt-publish"
client-id="spring-foo-1"
client-factory="clientFactory"
auto-startup="true"
url="tcp://localhost:1883"
default-qos="0"
default-retained="true"
default-topic="tweets/akki" />
<int-mqtt:message-driven-channel-adapter id="oneTopicAdapter"
client-id="spring-foo-2"
client-factory="clientFactory"
auto-startup="true"
url="tcp://localhost:1883"
topics="mqtt/publish"
/>
<int:service-activator input-channel="oneTopicAdapter" method="logMessages" ref="mqttLogger" output-channel="mqtt-publish"></int:service-activator>
<bean id="mqttLogger" class="hello.mqttReceiver" />
和mqttReceiver.java:
package hello;
public class mqttReceiver {
public String logMessages(String a){
String processed_data = a; //TODO Process Data
return processed_data;
}
}
以下是我面临的问题:
processed_data
被路由到 mqtt/publish 而不是 mqtt/akkiprocessed_data
不是发表过多次
这是正确的,因为 AbstractMqttMessageHandler
首先查看 headers
:
String topic = (String) message.getHeaders().get(MqttHeaders.TOPIC);
Object mqttMessage = this.converter.fromMessage(message, Object.class);
if (topic == null && this.defaultTopic == null) {
throw new MessageHandlingException(message,
"No '" + MqttHeaders.TOPIC + "' header and no default topic defined");
}
当 DefaultPahoMessageConverter
在消息到达时从 MqttPahoMessageDrivenChannelAdapter
填充 MqttHeaders.TOPIC
header。
在向 <int-mqtt:outbound-channel-adapter>
<int:header-filter header-names="mqtt_topic"/>