将队列的动态列表传递给队列侦听器
Passing dynamic list of queues into a queue listener
我有一个 Spring 引导应用程序,它为一部分队列旋转消费者,我希望能够在 运行 时间向这些消费者添加队列。
我已经安装了事件交换插件 (https://www.rabbitmq.com/event-exchange.html) 并创建了一个绑定到 amq.rabbitmq.event 交换的专用队列。当我静态声明队列时,我可以看到传入的事件。
我该如何完成这个 运行 时间魔法?我见过人们使用 属性 文件,但我不希望在 运行 时间内添加更多队列时不必修改 属性 文件
@Component
public class MessageConsumer {
List<String> allQueues = new ArrayList<String>();
public MessageConsumer() {
allQueues.add("queue1");
allQueues.add("queue2");
allQueues.add("queue3");
}
@RabbitListener(id = "event", queues = {"custom-emp-queue-events"}) // create this queue in rabbitmq management, bound to amqp exchange
public void processQueueEvents(Message message) {
... add the queue to the allQueues list on queue.created ...
}
@RabbitListener(id = "process", queues = allQueues.stream().toArray(String[]::new) ) // this is where the "issue" is
public void processMessageFromQueues(String messageAsJson) {
... process message ...
}
}
这可以用那边的 SpEL 表达式来完成:
@RabbitListener(id = "process", queues = "#{messageConsumer.allQueues}" )
但是你必须为 allQueues
添加一个 public getter。
在参考手册中查看更多信息:https://docs.spring.io/spring-amqp/docs/2.1.3.RELEASE/reference/html/_reference.html#async-annotation-driven
更新
@Autowired
private RabbitListenerEndpointRegistry listenerEdnpointRegistry;
@RabbitListener(id = "event", queues = {"custom-emp-queue-events"}) // create this queue in rabbitmq management, bound to amqp exchange
public void processQueueEvents(Message message) {
((AbstractMessageListenerContainer) this.listenerEdnpointRegistry.getListenerContainer("process")).addQueueNames(...);
}
我有一个 Spring 引导应用程序,它为一部分队列旋转消费者,我希望能够在 运行 时间向这些消费者添加队列。
我已经安装了事件交换插件 (https://www.rabbitmq.com/event-exchange.html) 并创建了一个绑定到 amq.rabbitmq.event 交换的专用队列。当我静态声明队列时,我可以看到传入的事件。
我该如何完成这个 运行 时间魔法?我见过人们使用 属性 文件,但我不希望在 运行 时间内添加更多队列时不必修改 属性 文件
@Component
public class MessageConsumer {
List<String> allQueues = new ArrayList<String>();
public MessageConsumer() {
allQueues.add("queue1");
allQueues.add("queue2");
allQueues.add("queue3");
}
@RabbitListener(id = "event", queues = {"custom-emp-queue-events"}) // create this queue in rabbitmq management, bound to amqp exchange
public void processQueueEvents(Message message) {
... add the queue to the allQueues list on queue.created ...
}
@RabbitListener(id = "process", queues = allQueues.stream().toArray(String[]::new) ) // this is where the "issue" is
public void processMessageFromQueues(String messageAsJson) {
... process message ...
}
}
这可以用那边的 SpEL 表达式来完成:
@RabbitListener(id = "process", queues = "#{messageConsumer.allQueues}" )
但是你必须为 allQueues
添加一个 public getter。
在参考手册中查看更多信息:https://docs.spring.io/spring-amqp/docs/2.1.3.RELEASE/reference/html/_reference.html#async-annotation-driven
更新
@Autowired
private RabbitListenerEndpointRegistry listenerEdnpointRegistry;
@RabbitListener(id = "event", queues = {"custom-emp-queue-events"}) // create this queue in rabbitmq management, bound to amqp exchange
public void processQueueEvents(Message message) {
((AbstractMessageListenerContainer) this.listenerEdnpointRegistry.getListenerContainer("process")).addQueueNames(...);
}