RabbitMQ:在运行时向侦听器动态添加队列
RabbitMQ: Dynamic addition of queues to a listener at runtime
我有一个用例,我需要使用在运行时发现的队列中的消息。
这里我有一个配置 class 和一个监听器 class。我已经为两个现有队列定义了一个消费者,并希望使用来自新队列的消息,这些消息可能会在运行时发现并遵循相同的命名约定,即 queue.animals.*
此外,我还有另一项服务,它将向我发送名为“newQueues”的队列上新发现的队列名称。如果不需要,可以更改此方法,我们可以摆脱在“newQueues”上发送消息的服务。
@EnableRabbit
public class RabbitConfiguration implements RabbitListenerConfigurer {
public static final String queue1= "queue.animals.cat";
public static final String queue2= "queue.animals.dog";
@Autowired
private ConnectionFactory connectionFactory;
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory);
}
@Bean
public DirectRabbitListenerContainerFactory rabbitListenerContainerFactory() {
DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
@Bean
public RabbitListenerEndpointRegistry listenerEndpointRegistry() {
return new RabbitListenerEndpointRegistry();
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setEndpointRegistry(listenerEndpointRegistry());
}
@Autowired
private RabbitListenerEndpointRegistry listenerEndpointRegistry;
@RabbitListener(id = "qEvent", queues = {"newQueues"})
public void processQueueEvents(String newQueueName) {
((DirectMessageListenerContainer) this.listenerEndpointRegistry.getListenerContainer("animalQContainer"))
.addQueueNames(newQueueName);
System.out.println("Received a message with the new queue name: " + newQueueName);
}
@RabbitListener(id = "animalQContainer" , queues = { queue1, queue2 })
public void processAnimals(Animal animalObj, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
System.out.println("Received a message on queue: " + queue + "data: " + animalObj);
//process animalObj
}
我目前遇到以下异常:
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'queue': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
我是 RabbitMQ 的新手,所以不确定我是否掌握了所有的部分。谢谢你的帮助。
您的问题令人困惑 - 标题是关于添加新的 queues 但随后您谈到了 JSON 异常。听起来您正在接收 non-json 内容。
到底是什么问题?
添加新的queues,
((AbsrtactMessageListenerContainer) rabbitListenEndpointRegistry.getMessageListenerContainer("animalQContainer"))
.addQeueNames("newQueue1", "newQueue2");
由于这里配置了Jackson2JsonMessageConverter,processQueueEvents方法无法解析字符串。创建了一个新的 class 并将对象传递给 processQueueEvent 方法以解决问题中提到的异常:
public void processQueueEvents(NewQueue newQueueName) {
System.out.println("Received a message on a new queue: " + newQueueName);
String name = newQueueName.toString();
我有一个用例,我需要使用在运行时发现的队列中的消息。
这里我有一个配置 class 和一个监听器 class。我已经为两个现有队列定义了一个消费者,并希望使用来自新队列的消息,这些消息可能会在运行时发现并遵循相同的命名约定,即 queue.animals.*
此外,我还有另一项服务,它将向我发送名为“newQueues”的队列上新发现的队列名称。如果不需要,可以更改此方法,我们可以摆脱在“newQueues”上发送消息的服务。
@EnableRabbit
public class RabbitConfiguration implements RabbitListenerConfigurer {
public static final String queue1= "queue.animals.cat";
public static final String queue2= "queue.animals.dog";
@Autowired
private ConnectionFactory connectionFactory;
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory);
}
@Bean
public DirectRabbitListenerContainerFactory rabbitListenerContainerFactory() {
DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
@Bean
public RabbitListenerEndpointRegistry listenerEndpointRegistry() {
return new RabbitListenerEndpointRegistry();
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setEndpointRegistry(listenerEndpointRegistry());
}
@Autowired
private RabbitListenerEndpointRegistry listenerEndpointRegistry;
@RabbitListener(id = "qEvent", queues = {"newQueues"})
public void processQueueEvents(String newQueueName) {
((DirectMessageListenerContainer) this.listenerEndpointRegistry.getListenerContainer("animalQContainer"))
.addQueueNames(newQueueName);
System.out.println("Received a message with the new queue name: " + newQueueName);
}
@RabbitListener(id = "animalQContainer" , queues = { queue1, queue2 })
public void processAnimals(Animal animalObj, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
System.out.println("Received a message on queue: " + queue + "data: " + animalObj);
//process animalObj
}
我目前遇到以下异常:
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'queue': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
我是 RabbitMQ 的新手,所以不确定我是否掌握了所有的部分。谢谢你的帮助。
您的问题令人困惑 - 标题是关于添加新的 queues 但随后您谈到了 JSON 异常。听起来您正在接收 non-json 内容。
到底是什么问题?
添加新的queues,
((AbsrtactMessageListenerContainer) rabbitListenEndpointRegistry.getMessageListenerContainer("animalQContainer"))
.addQeueNames("newQueue1", "newQueue2");
由于这里配置了Jackson2JsonMessageConverter,processQueueEvents方法无法解析字符串。创建了一个新的 class 并将对象传递给 processQueueEvent 方法以解决问题中提到的异常:
public void processQueueEvents(NewQueue newQueueName) {
System.out.println("Received a message on a new queue: " + newQueueName);
String name = newQueueName.toString();