RabbitMQ:在运行时向侦听器动态添加队列

RabbitMQ: Dynamic addition of queues to a listener at runtime

我有一个用例,我需要使用在运行时发现的队列中的消息。

这里我有一个配置 class 和一个监听器 class。我已经为两个现有队列定义了一个消费者,并希望使用来自新队列的消息,这些消息可能会在运行时发现并遵循相同的命名约定,即 queue.animals.*

此外,我还有另一项服务,它将向我发送名为“newQueues”的队列上新发现的队列名称。如果不需要,可以更改此方法,我们可以摆脱在“newQueues”上发送消息的服务。

@EnableRabbit
public class RabbitConfiguration implements RabbitListenerConfigurer {

    public static final String queue1= "queue.animals.cat";
    public static final String queue2= "queue.animals.dog";

    @Autowired
    private ConnectionFactory connectionFactory;

    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory);
    }
    @Bean
    public DirectRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }

    @Bean
    public RabbitListenerEndpointRegistry listenerEndpointRegistry() {
        return new RabbitListenerEndpointRegistry();
    }

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        registrar.setEndpointRegistry(listenerEndpointRegistry());
    }

@Autowired
private RabbitListenerEndpointRegistry listenerEndpointRegistry;

@RabbitListener(id = "qEvent", queues = {"newQueues"})
public void processQueueEvents(String newQueueName) {
    ((DirectMessageListenerContainer) this.listenerEndpointRegistry.getListenerContainer("animalQContainer"))
        .addQueueNames(newQueueName);

    System.out.println("Received a message with the new queue name: " + newQueueName);
    
}

@RabbitListener(id = "animalQContainer" , queues = { queue1, queue2 })
public void processAnimals(Animal animalObj, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
    System.out.println("Received a message on queue: " + queue + "data: " + animalObj);
    //process animalObj
}

我目前遇到以下异常:

Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'queue': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')

我是 RabbitMQ 的新手,所以不确定我是否掌握了所有的部分。谢谢你的帮助。

您的问题令人困惑 - 标题是关于添加新的 queues 但随后您谈到了 JSON 异常。听起来您正在接收 non-json 内容。

到底是什么问题?

添加新的queues,

((AbsrtactMessageListenerContainer) rabbitListenEndpointRegistry.getMessageListenerContainer("animalQContainer"))
    .addQeueNames("newQueue1", "newQueue2");

由于这里配置了Jackson2JsonMessageConverter,processQueueEvents方法无法解析字符串。创建了一个新的 class 并将对象传递给 processQueueEvent 方法以解决问题中提到的异常:

    public void processQueueEvents(NewQueue newQueueName) {
        System.out.println("Received a message on a new queue: " + newQueueName);
        String name = newQueueName.toString();