@RabbitListener 用于多个 类 中的同一个队列
@RabbitListener for the same queue in multiple classes
我想知道在 Spring AMQP 中是否可以根据负载类型从多个 class 中的同一队列接收消息。
我知道在 class 中使用 @RabbitListener 注释,然后将 @RabbitHandler 放在方法上,但我想将消息处理的复杂性拆分为多个 classes,同时保持单个排队。
当前使用的版本:Spring AMQP v2.0.3 和 RabbitMQ。
嗯,这是不可能的。那么您希望它的方式将不会是 queue。
设计一个 listener 并根据 payload
类型分配给它的方法,这确实是一个架构决定。
作为解决方法,我建议您将逻辑从单个 @RabbitListener
class 委托给这些业务服务:
@RabbitListener(queues = "foo")
public class MyListener {
private final ServiceA serviceA;
private final ServiceB serviceB;
public MyListener(ServiceA serviceA, ServiceB serviceB) {
this.serviceA = serviceA;
this.serviceB = serviceB;
}
@RabbitHandler
public void handleA(A a) {
this.serviceA.handle(a);
}
@RabbitHandler
public void handleB(B b) {
this.serviceB.handle(b);
}
}
是的,你可以,但它需要一些不同的方法:
您需要听取通用消息类型、进行一些切换以及您自己的反序列化。您当然可以将该代码完全隐藏在某处(基类、注释...)
可以扩展下面的示例以收听任何额外的类型。
上面的示例将过滤 A 和 B DTO 类型。
void receive(ADTO dto)
{
System.out.print(dto.url);
}
void receive(BDTO dto)
{
System.out.print(dto.url);
}
@RabbitListener(queues = "your.queue.name")
public void listenMesssage(Message message)
{
try
{
String typeId = message.getMessageProperties().getHeaders().get("__TypeId__").toString();
String contentType = message.getMessageProperties().getContentType();
if (contentType != "application/json" || typeId == null || !typeId.contains(ADTO.class.toString()))
{
//TODO log warning
System.out.print("type not supported by this service");
return;
}
Object receivedObject = new Jackson2JsonMessageConverter().fromMessage(message);
if (receivedObject instanceof ADTO)
{
receive((ADTO)receivedObject);
System.out.print("ADTO");
}
//else
或者,您也可以像这样进行序列化:
....
String typeId = message.getMessageProperties().getHeaders().get("__TypeId__").toString();
byte[] binMsg = message.getBody();
String strMsg = new String(binMsg, StandardCharsets.UTF_8);
ObjectMapper mapper = new ObjectMapper();
if (typeId.contains("ADTO"))
{
receive(mapper.readValue(strMsg, ADTO.class ));
}
else
...
或者,我发现,如果你有足够高版本的 AQMP 和 Rabbit
(我的 = spring-rabit 2.1.3,spring-starter-aqmp 2.1.2)你可以做这个变形:
@RabbitListener(id="multi", queues = "somequeuename")
public class SomeService
{
@RabbitHandler
public void handleADTO(@Payload ADTO adto) {
System.out.print(adto.url);
}
@RabbitHandler
public void handleADTO2(@Payload ADTO2 adto) {
System.out.print(adto.url);
}
}
但是,开箱即用,这不起作用。您需要配置另一个 "bag of Beans":
@Bean
public SimpleRabbitListenerContainerFactory myFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory =
new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
@Bean
Jackson2JsonMessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
RabbitTemplate rabbitTemplate(Jackson2JsonMessageConverter converter, ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
只需将其放入您的应用程序或其他应用程序即可 class。
另见
https://docs.spring.io/spring-amqp/docs/current/reference/html/
我想知道在 Spring AMQP 中是否可以根据负载类型从多个 class 中的同一队列接收消息。
我知道在 class 中使用 @RabbitListener 注释,然后将 @RabbitHandler 放在方法上,但我想将消息处理的复杂性拆分为多个 classes,同时保持单个排队。
当前使用的版本:Spring AMQP v2.0.3 和 RabbitMQ。
嗯,这是不可能的。那么您希望它的方式将不会是 queue。
设计一个 listener 并根据 payload
类型分配给它的方法,这确实是一个架构决定。
作为解决方法,我建议您将逻辑从单个 @RabbitListener
class 委托给这些业务服务:
@RabbitListener(queues = "foo")
public class MyListener {
private final ServiceA serviceA;
private final ServiceB serviceB;
public MyListener(ServiceA serviceA, ServiceB serviceB) {
this.serviceA = serviceA;
this.serviceB = serviceB;
}
@RabbitHandler
public void handleA(A a) {
this.serviceA.handle(a);
}
@RabbitHandler
public void handleB(B b) {
this.serviceB.handle(b);
}
}
是的,你可以,但它需要一些不同的方法: 您需要听取通用消息类型、进行一些切换以及您自己的反序列化。您当然可以将该代码完全隐藏在某处(基类、注释...)
可以扩展下面的示例以收听任何额外的类型。 上面的示例将过滤 A 和 B DTO 类型。
void receive(ADTO dto)
{
System.out.print(dto.url);
}
void receive(BDTO dto)
{
System.out.print(dto.url);
}
@RabbitListener(queues = "your.queue.name")
public void listenMesssage(Message message)
{
try
{
String typeId = message.getMessageProperties().getHeaders().get("__TypeId__").toString();
String contentType = message.getMessageProperties().getContentType();
if (contentType != "application/json" || typeId == null || !typeId.contains(ADTO.class.toString()))
{
//TODO log warning
System.out.print("type not supported by this service");
return;
}
Object receivedObject = new Jackson2JsonMessageConverter().fromMessage(message);
if (receivedObject instanceof ADTO)
{
receive((ADTO)receivedObject);
System.out.print("ADTO");
}
//else
或者,您也可以像这样进行序列化:
....
String typeId = message.getMessageProperties().getHeaders().get("__TypeId__").toString();
byte[] binMsg = message.getBody();
String strMsg = new String(binMsg, StandardCharsets.UTF_8);
ObjectMapper mapper = new ObjectMapper();
if (typeId.contains("ADTO"))
{
receive(mapper.readValue(strMsg, ADTO.class ));
}
else
...
或者,我发现,如果你有足够高版本的 AQMP 和 Rabbit (我的 = spring-rabit 2.1.3,spring-starter-aqmp 2.1.2)你可以做这个变形:
@RabbitListener(id="multi", queues = "somequeuename")
public class SomeService
{
@RabbitHandler
public void handleADTO(@Payload ADTO adto) {
System.out.print(adto.url);
}
@RabbitHandler
public void handleADTO2(@Payload ADTO2 adto) {
System.out.print(adto.url);
}
}
但是,开箱即用,这不起作用。您需要配置另一个 "bag of Beans":
@Bean
public SimpleRabbitListenerContainerFactory myFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory =
new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
@Bean
Jackson2JsonMessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
RabbitTemplate rabbitTemplate(Jackson2JsonMessageConverter converter, ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
只需将其放入您的应用程序或其他应用程序即可 class。
另见
https://docs.spring.io/spring-amqp/docs/current/reference/html/