根据 header 属性(例如路由键)处理入站 AMQP 消息
Handle inbound AMQP messages based on a header attribute (e.g. routing key)
我有一个接收 AMQP 消息的服务。此服务绑定到 queue,它接收与一组路由键匹配的所有消息。
我的设置如下:
...
private SomeController controller;
@Autowired
private SimpleMessageListenerContainer receiverContainer;
@Bean
public IntegrationFlow inboundFlow(){
var adpater = Amqp.inboundAdapter(receiverContainer);
return IntegrationFlows.from(adapter)
// some transformations
.handle(controller, "processMessage")
.get();
}
这已经很好用了。但是,现在我想根据 header 属性使用不同的控制器处理一条消息。在这种情况下,我想为每个路由键设置一个控制器。将单个 queue 与多个路由密钥一起使用只是为了对每个密钥进行不同的处理也是一个好主意吗?
在一个交换器和一个队列之间有多个绑定确实是合法的。
在本教程中查看更多信息:https://www.rabbitmq.com/tutorials/tutorial-four-spring-amqp.html。
Amqp.inboundAdapter()
默认依赖于 DefaultAmqpHeaderMapper.inboundMapper()
,它会在生成之前为我们填充一个 AmqpHeaders.RECEIVED_ROUTING_KEY
消息头。因此,您确实可以使用 route(Message.class, m -> m.getHeaders().get(AmqpHeaders.RECEIVED_ROUTING_KEY))
和适当的 channelMapping()
作为路由键值。
我只是想添加一个代码示例,包含 Artem Bilan 的(正确)答案,因为除此之外,我还必须包含一个网关(Artem Bilan 用“适当的 channelMapping()
).
有关为什么需要网关或在某些情况下需要网桥的更多信息,请参阅 documentation 的这一部分。
我的初始代码片段变成如下所示:
...
@Autowired
private FirstController firstController;
@Autowired
private SecondController secondController;
@Autowired
private SimpleMessageListenerContainer receiverContainer;
@Bean
public IntegrationFlow inboundFlow(){
var adpater = Amqp.inboundAdapter(receiverContainer);
return IntegrationFlows.from(adapter)
// some transformations
.route(Message.class, getMessageRoutingKey(m),
m -> m.subFlowMapping("routingKey1", firstFlow())
// after the first subFlow, all further integrationflows are wrapped in a gateway
.subFlowMapping("routingKey2", sf -> sf.gateway(secondFlow())))
.get();
}
@Bean
public IntegrationFlow firstFlow() {
return f -> f
// e.g. additional transformations
.handle(firstController, "processMessageInFirstFashion");
}
@Bean
public IntegrationFlow secondFlow() {
return f -> f
// e.g. additional transformations
.handle(secondController, "processMessageInSecondFashion");
}
private static String getMessageRoutingKey(final Message<?> message) {
return message.getHeaders().get(AmqpHeaders.RECEIVED_ROUTING_KEY).toString();
}
我有一个接收 AMQP 消息的服务。此服务绑定到 queue,它接收与一组路由键匹配的所有消息。
我的设置如下:
...
private SomeController controller;
@Autowired
private SimpleMessageListenerContainer receiverContainer;
@Bean
public IntegrationFlow inboundFlow(){
var adpater = Amqp.inboundAdapter(receiverContainer);
return IntegrationFlows.from(adapter)
// some transformations
.handle(controller, "processMessage")
.get();
}
这已经很好用了。但是,现在我想根据 header 属性使用不同的控制器处理一条消息。在这种情况下,我想为每个路由键设置一个控制器。将单个 queue 与多个路由密钥一起使用只是为了对每个密钥进行不同的处理也是一个好主意吗?
在一个交换器和一个队列之间有多个绑定确实是合法的。
在本教程中查看更多信息:https://www.rabbitmq.com/tutorials/tutorial-four-spring-amqp.html。
Amqp.inboundAdapter()
默认依赖于 DefaultAmqpHeaderMapper.inboundMapper()
,它会在生成之前为我们填充一个 AmqpHeaders.RECEIVED_ROUTING_KEY
消息头。因此,您确实可以使用 route(Message.class, m -> m.getHeaders().get(AmqpHeaders.RECEIVED_ROUTING_KEY))
和适当的 channelMapping()
作为路由键值。
我只是想添加一个代码示例,包含 Artem Bilan 的(正确)答案,因为除此之外,我还必须包含一个网关(Artem Bilan 用“适当的 channelMapping()
).
有关为什么需要网关或在某些情况下需要网桥的更多信息,请参阅 documentation 的这一部分。
我的初始代码片段变成如下所示:
...
@Autowired
private FirstController firstController;
@Autowired
private SecondController secondController;
@Autowired
private SimpleMessageListenerContainer receiverContainer;
@Bean
public IntegrationFlow inboundFlow(){
var adpater = Amqp.inboundAdapter(receiverContainer);
return IntegrationFlows.from(adapter)
// some transformations
.route(Message.class, getMessageRoutingKey(m),
m -> m.subFlowMapping("routingKey1", firstFlow())
// after the first subFlow, all further integrationflows are wrapped in a gateway
.subFlowMapping("routingKey2", sf -> sf.gateway(secondFlow())))
.get();
}
@Bean
public IntegrationFlow firstFlow() {
return f -> f
// e.g. additional transformations
.handle(firstController, "processMessageInFirstFashion");
}
@Bean
public IntegrationFlow secondFlow() {
return f -> f
// e.g. additional transformations
.handle(secondController, "processMessageInSecondFashion");
}
private static String getMessageRoutingKey(final Message<?> message) {
return message.getHeaders().get(AmqpHeaders.RECEIVED_ROUTING_KEY).toString();
}