每个队列在 spring-rabbitmq 中创建动态容器
dynamic container creation in spring-rabbitmq per queue
我的应用程序有多个队列(队列名称将从数据库中获取)并且每个队列每天都会消耗大量数据。
为此,我需要为每个队列创建一个容器和消息侦听器,以便每个队列都有一个单独的线程。除此之外,还可以动态创建一些队列,我需要为新创建的队列分配一个容器
我的消费者 class 开始如下所示
// 下面是我的class开始的方式
@Component
public class RequestConsumer implements MessageListener {```
//and below is the code by which I am creating Message listner
@Bean
@Scope(value = "prototype")
public SimpleMessageListenerContainer simpleMessageListenerNotification(
ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer simpleMessageListenerContainer =
new SimpleMessageListenerContainer(connectionFactory);
RabbitAdmin rabbitAdmin = getRabbitAdmin(connectionFactory);
RequestConsumer RequestConsumer = (RequestConsumer) beanFactory.getBean("requestConsumer");
simpleMessageListenerContainer.setupMessageListener(RequestConsumer);
simpleMessageListenerContainer.setAutoDeclare(true);
for (String queueName : requestConsumerQueueList()) {
Queue queue = new Queue(queueName);
rabbitAdmin.declareQueue(queue);
simpleMessageListenerContainer.addQueues(queue);
}
simpleMessageListenerContainer.start();
return simpleMessageListenerContainer;
}
我当前的代码只为所有队列创建一个容器和一个 messageListner,而我希望每个队列都有单独的容器。
首先,您不应该在 bean 定义中声明队列 - 这在上下文的生命周期中还为时过早。
您也不应该在 bean 定义中调用 start()
- 同样,太早了。
你应该这样做:
@SpringBootApplication
public class So56951298Application {
public static void main(String[] args) {
SpringApplication.run(So56951298Application.class, args);
}
@Bean
public Declarables queues() {
return new Declarables(Arrays.asList(new Queue("q1"), new Queue("q2")));
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
Queue queue) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue);
container.setMessageListener(msg -> System.out.println(msg));
return container;
}
@Bean
public ApplicationRunner runner(ConnectionFactory connectionFactory, Declarables queues) {
return args -> {
queues.getDeclarables().forEach(dec -> container(connectionFactory, (Queue) dec).start());
};
}
}
框架会在适当的时候自动声明队列(只要在应用程序上下文中有 RabbitAdmin
(Spring Boot 自动配置)。
我的应用程序有多个队列(队列名称将从数据库中获取)并且每个队列每天都会消耗大量数据。 为此,我需要为每个队列创建一个容器和消息侦听器,以便每个队列都有一个单独的线程。除此之外,还可以动态创建一些队列,我需要为新创建的队列分配一个容器
我的消费者 class 开始如下所示
// 下面是我的class开始的方式
@Component
public class RequestConsumer implements MessageListener {```
//and below is the code by which I am creating Message listner
@Bean
@Scope(value = "prototype")
public SimpleMessageListenerContainer simpleMessageListenerNotification(
ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer simpleMessageListenerContainer =
new SimpleMessageListenerContainer(connectionFactory);
RabbitAdmin rabbitAdmin = getRabbitAdmin(connectionFactory);
RequestConsumer RequestConsumer = (RequestConsumer) beanFactory.getBean("requestConsumer");
simpleMessageListenerContainer.setupMessageListener(RequestConsumer);
simpleMessageListenerContainer.setAutoDeclare(true);
for (String queueName : requestConsumerQueueList()) {
Queue queue = new Queue(queueName);
rabbitAdmin.declareQueue(queue);
simpleMessageListenerContainer.addQueues(queue);
}
simpleMessageListenerContainer.start();
return simpleMessageListenerContainer;
}
我当前的代码只为所有队列创建一个容器和一个 messageListner,而我希望每个队列都有单独的容器。
首先,您不应该在 bean 定义中声明队列 - 这在上下文的生命周期中还为时过早。
您也不应该在 bean 定义中调用 start()
- 同样,太早了。
你应该这样做:
@SpringBootApplication
public class So56951298Application {
public static void main(String[] args) {
SpringApplication.run(So56951298Application.class, args);
}
@Bean
public Declarables queues() {
return new Declarables(Arrays.asList(new Queue("q1"), new Queue("q2")));
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
Queue queue) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue);
container.setMessageListener(msg -> System.out.println(msg));
return container;
}
@Bean
public ApplicationRunner runner(ConnectionFactory connectionFactory, Declarables queues) {
return args -> {
queues.getDeclarables().forEach(dec -> container(connectionFactory, (Queue) dec).start());
};
}
}
框架会在适当的时候自动声明队列(只要在应用程序上下文中有 RabbitAdmin
(Spring Boot 自动配置)。