我们如何使用 spring 集成让不同的处理程序订阅来自不同主题的消息?
How can we have different handlers to subscribe messages from different topics using spring Integration?
我编写了我的第一个 spring 集成应用程序,它使用 mqtt 代理订阅来自设备的不同主题的消息。设备正在发布消息,客户端(代码)正在使用相同的主题访问这些消息。
我添加了一个处理程序来访问来自代理的消息,并在 类 中进一步使用它。现在,就我而言,我想为不同的主题设置不同的处理程序,以便它们都可以映射到不同的 VO 类 并在业务逻辑中进一步使用它。
据我所知,我只想创建一个到代理的连接,一个通道,但可能会出现不同的主题,它们应该在同一连接的不同处理程序中处理。我怎样才能做到这一点?
@SpringBootApplication
public class MqttJavaApplication {
public static void main(String[] args) {
// SpringApplicationBuilder springApplicationBuilder = new SpringApplicationBuilder(MqttJavaApplication.class);
SpringApplication.run(MqttJavaApplication.class,args);
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MqttPahoMessageDrivenChannelAdapter inbound() {
String clientId = "uuid-" + UUID.randomUUID().toString();
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", clientId,"camera/status");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
// adapter.setOutputChannelName("mqttInputChannel");
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
public IntegrationFlow mqttInFlow() {
System.out.println(Arrays.stream(SubScribeMessages.class.getMethods()).findFirst());
return IntegrationFlows.from(inbound())
.transform(p -> p)
.handle("addTopics","handlHere")
.get();
}
@Component
public class MyService{
@Autowired
MqttPahoMessageDrivenChannelAdapter adapter;
@Bean
public String addTopics()
{
if(adapter.getTopic().length>0)
{
adapter.addTopic("camera/+/counts"); //new topic
adapter.addTopic("camera/+/live_counts"); //new topic
}
return "";
}
// topic "camera/+/counts" is handled here but other messages also come here, how do we handle other topics in separate handlers?
@ServiceActivator(inputChannel = "mqttInputChannel")
public void handleHere(@Payload Object mess) throws JsonProcessingException {
String[] topics = adapter.getTopic();
for(String topic:topics)
System.out.println(topic); // How can I get topic name which is using a wildcard?
ObjectMapper objectMapper = new ObjectMapper();
String json=mess.toString();
System.out.println(json);
CountVo countVo = objectMapper.readValue(json, CountVo.class);
if (!countVo.equals(null))
System.out.println(countVo.getIrisysCounts().get(0).getName());
}
}
}
补充问题
使用通配符时如何获取完整的主题名称?已发布但被通配符捕获的实际主题。
请帮忙。
添加路由器(.route(...)
);您可以在 MqttHeaders.RECEIVED_TOPIC
header(包含主题名称)上路由到每个主题的不同流程。
编辑
最简单的路由器是简单地将主题名称映射到频道名称。这是一个例子:
@SpringBootApplication
public class So67391175Application {
public static void main(String[] args) {
SpringApplication.run(So67391175Application.class, args);
}
@Bean
public DefaultMqttPahoClientFactory pahoClientFactory() {
DefaultMqttPahoClientFactory pahoClientFactory = new DefaultMqttPahoClientFactory();
MqttConnectOptions connectionOptions = new MqttConnectOptions();
connectionOptions.setServerURIs(new String[] { "tcp://localhost:1883" });
pahoClientFactory.setConnectionOptions(connectionOptions);
return pahoClientFactory;
}
@Bean
public IntegrationFlow mqttInFlow(DefaultMqttPahoClientFactory pahoClientFactory) {
return IntegrationFlows.from(
new MqttPahoMessageDrivenChannelAdapter("testClient",
pahoClientFactory, "topic1", "topic2"))
.route("headers['" + MqttHeaders.RECEIVED_TOPIC + "']")
.get();
}
@Bean
public IntegrationFlow flow1() {
return IntegrationFlows.from("topic1")
.handle((payload, headers) -> {
System.out.println("message from topic1 " + payload + ": " + headers);
return null;
})
.get();
}
@Bean
public IntegrationFlow flow2() {
return IntegrationFlows.from("topic2")
.handle((payload, headers) -> {
System.out.println("message from topic2 " + payload + ": " + headers);
return null;
})
.get();
}
}
message from topic1 test: {mqtt_receivedRetained=false, mqtt_id=1, mqtt_duplicate=false, id=1d950bce-aa47-7e3b-1a0d-e4d01ed707de, mqtt_receivedTopic=topic1, mqtt_receivedQos=1, timestamp=1620250633090}
message from topic2 test: {mqtt_receivedRetained=false, mqtt_id=2, mqtt_duplicate=false, id=7e9c3f51-c148-2b18-3588-ed27e93dae19, mqtt_receivedTopic=topic2, mqtt_receivedQos=1, timestamp=1620250644602}
谢谢加里!我认为你在路由上给出的答案只能采用定义的主题,而不是通配符或任何其他正则表达式。我无法理解动态路由对我有何帮助。
事实证明,我可以在初始化 bean 时添加通配符,并且可以使用适配器处理在输入通道上使用服务激活器。
像这样:
@SpringBootApplication
public class MqttJavaApplication {
public static void main(String[] args) {
SpringApplication.run(MqttJavaApplication.class,args);
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MqttPahoMessageDrivenChannelAdapter inbound() {
String clientId = "uuid-" + UUID.randomUUID().toString();
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", clientId, "irisys/V4D-20230143/status" );
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Component
public class MyService{
@Autowired
MqttPahoMessageDrivenChannelAdapter adapter;
@Bean
public String addTopics()
{
if(adapter.getTopic().length>0)
{
adapter.addTopic("camera/+/counts");
}
return "";
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@SneakyThrows
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
System.out.println(message.getHeaders());
if(message.getHeaders().get("mqtt_receivedTopic").toString().contains("counts"))
{
ObjectMapper objectMapper = new ObjectMapper();
String json=message.getPayload().toString();
System.out.println(json);
CountVo countVo = objectMapper.readValue(json, CountVo.class);
if (!countVo.equals(null))
System.out.println(countVo.getIrisysCounts().get(0).getName());
}
}
};
}
}
}
您认为还有比这更好的方法吗?除了这个我想不出别的了。
我编写了我的第一个 spring 集成应用程序,它使用 mqtt 代理订阅来自设备的不同主题的消息。设备正在发布消息,客户端(代码)正在使用相同的主题访问这些消息。
我添加了一个处理程序来访问来自代理的消息,并在 类 中进一步使用它。现在,就我而言,我想为不同的主题设置不同的处理程序,以便它们都可以映射到不同的 VO 类 并在业务逻辑中进一步使用它。
据我所知,我只想创建一个到代理的连接,一个通道,但可能会出现不同的主题,它们应该在同一连接的不同处理程序中处理。我怎样才能做到这一点?
@SpringBootApplication
public class MqttJavaApplication {
public static void main(String[] args) {
// SpringApplicationBuilder springApplicationBuilder = new SpringApplicationBuilder(MqttJavaApplication.class);
SpringApplication.run(MqttJavaApplication.class,args);
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MqttPahoMessageDrivenChannelAdapter inbound() {
String clientId = "uuid-" + UUID.randomUUID().toString();
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", clientId,"camera/status");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
// adapter.setOutputChannelName("mqttInputChannel");
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
public IntegrationFlow mqttInFlow() {
System.out.println(Arrays.stream(SubScribeMessages.class.getMethods()).findFirst());
return IntegrationFlows.from(inbound())
.transform(p -> p)
.handle("addTopics","handlHere")
.get();
}
@Component
public class MyService{
@Autowired
MqttPahoMessageDrivenChannelAdapter adapter;
@Bean
public String addTopics()
{
if(adapter.getTopic().length>0)
{
adapter.addTopic("camera/+/counts"); //new topic
adapter.addTopic("camera/+/live_counts"); //new topic
}
return "";
}
// topic "camera/+/counts" is handled here but other messages also come here, how do we handle other topics in separate handlers?
@ServiceActivator(inputChannel = "mqttInputChannel")
public void handleHere(@Payload Object mess) throws JsonProcessingException {
String[] topics = adapter.getTopic();
for(String topic:topics)
System.out.println(topic); // How can I get topic name which is using a wildcard?
ObjectMapper objectMapper = new ObjectMapper();
String json=mess.toString();
System.out.println(json);
CountVo countVo = objectMapper.readValue(json, CountVo.class);
if (!countVo.equals(null))
System.out.println(countVo.getIrisysCounts().get(0).getName());
}
}
}
补充问题
使用通配符时如何获取完整的主题名称?已发布但被通配符捕获的实际主题。
请帮忙。
添加路由器(.route(...)
);您可以在 MqttHeaders.RECEIVED_TOPIC
header(包含主题名称)上路由到每个主题的不同流程。
编辑
最简单的路由器是简单地将主题名称映射到频道名称。这是一个例子:
@SpringBootApplication
public class So67391175Application {
public static void main(String[] args) {
SpringApplication.run(So67391175Application.class, args);
}
@Bean
public DefaultMqttPahoClientFactory pahoClientFactory() {
DefaultMqttPahoClientFactory pahoClientFactory = new DefaultMqttPahoClientFactory();
MqttConnectOptions connectionOptions = new MqttConnectOptions();
connectionOptions.setServerURIs(new String[] { "tcp://localhost:1883" });
pahoClientFactory.setConnectionOptions(connectionOptions);
return pahoClientFactory;
}
@Bean
public IntegrationFlow mqttInFlow(DefaultMqttPahoClientFactory pahoClientFactory) {
return IntegrationFlows.from(
new MqttPahoMessageDrivenChannelAdapter("testClient",
pahoClientFactory, "topic1", "topic2"))
.route("headers['" + MqttHeaders.RECEIVED_TOPIC + "']")
.get();
}
@Bean
public IntegrationFlow flow1() {
return IntegrationFlows.from("topic1")
.handle((payload, headers) -> {
System.out.println("message from topic1 " + payload + ": " + headers);
return null;
})
.get();
}
@Bean
public IntegrationFlow flow2() {
return IntegrationFlows.from("topic2")
.handle((payload, headers) -> {
System.out.println("message from topic2 " + payload + ": " + headers);
return null;
})
.get();
}
}
message from topic1 test: {mqtt_receivedRetained=false, mqtt_id=1, mqtt_duplicate=false, id=1d950bce-aa47-7e3b-1a0d-e4d01ed707de, mqtt_receivedTopic=topic1, mqtt_receivedQos=1, timestamp=1620250633090}
message from topic2 test: {mqtt_receivedRetained=false, mqtt_id=2, mqtt_duplicate=false, id=7e9c3f51-c148-2b18-3588-ed27e93dae19, mqtt_receivedTopic=topic2, mqtt_receivedQos=1, timestamp=1620250644602}
谢谢加里!我认为你在路由上给出的答案只能采用定义的主题,而不是通配符或任何其他正则表达式。我无法理解动态路由对我有何帮助。
事实证明,我可以在初始化 bean 时添加通配符,并且可以使用适配器处理在输入通道上使用服务激活器。
像这样:
@SpringBootApplication
public class MqttJavaApplication {
public static void main(String[] args) {
SpringApplication.run(MqttJavaApplication.class,args);
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MqttPahoMessageDrivenChannelAdapter inbound() {
String clientId = "uuid-" + UUID.randomUUID().toString();
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", clientId, "irisys/V4D-20230143/status" );
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Component
public class MyService{
@Autowired
MqttPahoMessageDrivenChannelAdapter adapter;
@Bean
public String addTopics()
{
if(adapter.getTopic().length>0)
{
adapter.addTopic("camera/+/counts");
}
return "";
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@SneakyThrows
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
System.out.println(message.getHeaders());
if(message.getHeaders().get("mqtt_receivedTopic").toString().contains("counts"))
{
ObjectMapper objectMapper = new ObjectMapper();
String json=message.getPayload().toString();
System.out.println(json);
CountVo countVo = objectMapper.readValue(json, CountVo.class);
if (!countVo.equals(null))
System.out.println(countVo.getIrisysCounts().get(0).getName());
}
}
};
}
}
}
您认为还有比这更好的方法吗?除了这个我想不出别的了。