@RabbitListener 用于多个 类 中的同一个队列

@RabbitListener for the same queue in multiple classes

我想知道在 Spring AMQP 中是否可以根据负载类型从多个 class 中的同一队列接收消息。

我知道在 class 中使用 @RabbitListener 注释,然后将 @RabbitHandler 放在方法上,但我想将消息处理的复杂性拆分为多个 classes,同时保持单个排队。

当前使用的版本:Spring AMQP v2.0.3 和 RabbitMQ。

嗯,这是不可能的。那么您希望它的方式将不会是 queue。 设计一个 listener 并根据 payload 类型分配给它的方法,这确实是一个架构决定。

作为解决方法,我建议您将逻辑从单个 @RabbitListener class 委托给这些业务服务:

    @RabbitListener(queues = "foo")
    public class MyListener {

        private final ServiceA serviceA;

        private final ServiceB serviceB;

        public MyListener(ServiceA serviceA, ServiceB serviceB) {
              this.serviceA = serviceA;
              this.serviceB = serviceB;
        }


        @RabbitHandler
        public void handleA(A a) {
             this.serviceA.handle(a);
        }

        @RabbitHandler
        public void handleB(B b) {
             this.serviceB.handle(b);
        }
    }

是的,你可以,但它需要一些不同的方法: 您需要听取通用消息类型、进行一些切换以及您自己的反序列化。您当然可以将该代码完全隐藏在某处(基类、注释...)

可以扩展下面的示例以收听任何额外的类型。 上面的示例将过滤 A 和 B DTO 类型。

void receive(ADTO dto)
{
    System.out.print(dto.url);
}

void receive(BDTO dto)
{
    System.out.print(dto.url);
}
@RabbitListener(queues = "your.queue.name")
public void listenMesssage(Message message) 
 {
    try 
    {
        String typeId = message.getMessageProperties().getHeaders().get("__TypeId__").toString();
        String contentType = message.getMessageProperties().getContentType();
        if (contentType != "application/json" || typeId == null || !typeId.contains(ADTO.class.toString()))
        {
            //TODO log warning
            System.out.print("type not supported by this service");
            return;
        }
        Object receivedObject = new Jackson2JsonMessageConverter().fromMessage(message);

        if (receivedObject instanceof ADTO)
        {
            receive((ADTO)receivedObject);
            System.out.print("ADTO");
        }
        //else

或者,您也可以像这样进行序列化:

....
String typeId = message.getMessageProperties().getHeaders().get("__TypeId__").toString();
byte[] binMsg =  message.getBody();
String strMsg  = new String(binMsg, StandardCharsets.UTF_8);
ObjectMapper mapper = new ObjectMapper();
if (typeId.contains("ADTO"))
{
   receive(mapper.readValue(strMsg, ADTO.class ));
}
else
...            

或者,我发现,如果你有足够高版本的 AQMP 和 Rabbit (我的 = spring-rabit 2.1.3,spring-starter-aqmp 2.1.2)你可以做这个变形:

@RabbitListener(id="multi", queues = "somequeuename")
public class SomeService 
{
    @RabbitHandler
    public void handleADTO(@Payload ADTO adto) {
         System.out.print(adto.url);
    }

    @RabbitHandler
    public void handleADTO2(@Payload ADTO2 adto) {
         System.out.print(adto.url);
    }
}

但是,开箱即用,这不起作用。您需要配置另一个 "bag of Beans":

@Bean
public SimpleRabbitListenerContainerFactory myFactory(
        SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory =
            new SimpleRabbitListenerContainerFactory();
    configurer.configure(factory, connectionFactory);
    return factory;
}

@Bean
Jackson2JsonMessageConverter jsonMessageConverter() {
    return new Jackson2JsonMessageConverter();
}

@Bean
RabbitTemplate rabbitTemplate(Jackson2JsonMessageConverter converter, ConnectionFactory connectionFactory) {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    template.setMessageConverter(new Jackson2JsonMessageConverter());
    return template;
}

只需将其放入您的应用程序或其他应用程序即可 class。

另见

https://docs.spring.io/spring-amqp/docs/current/reference/html/