动态添加队列消费者 Spring 问题
Dynamic Add queue Consumers Spring Issue
我正在尝试以独占模式向现有侦听器添加新队列,但我遇到了问题。
代码:
@GetMapping(value = "/consumer")
public void consumer() {
Set<String> queue = new HashSet<>(Arrays.asList("Test queue1", "Test queue2", "Test queue3"));
// for (int i = 0; i < 10; i++) {
addQueueToListener("Test container1", queue);
// }
}
public void addQueueToListener(String containerId, String queueName) {
SimpleMessageListenerContainer listener = (SimpleMessageListenerContainer)
rabbitListenerEndpointRegistry.getListenerContainer(containerId);
if (Objects.nonNull(listener)) {
String[] queueNames = listener.getQueueNames();
System.out.println("Queue Names: " + Arrays.toString(queueNames) + " going to Add: " + queueName);
System.out.println("search result for queue name: " + queueName + " -->" + Arrays.binarySearch(queueNames, queueName));
//do not add same queue name to listener
if (Arrays.binarySearch(queueNames, queueName) < 0) {
System.out.println("Adding queue: " + queueName);
listener.addQueueNames(queueName);
System.out.println("added queue: " + queueName);
}
}
}
public void addQueueToListener(String containerId, Set<String> queueName) {
for (String queue : queueName) {
addQueueToListener(containerId, queue);
}
}
兔子监听代码:
@RabbitListener(id = "Test container1", queues = "#{rabbitMQConsumer.getEntriesQueueName()}", containerFactory = "consumerBatchContainerFactory")
public void receiveMessageBatch(List<Message> messages) throws InterruptedException, IOException {
for (Message message : messages) {
Thread.sleep(30000);
// ObjectMapper jsonObjectMapper = new ObjectMapper();
// Employee employee = jsonObjectMapper.readValue(Arrays.toString(message.getBody()), Employee.class);
String queueName = (String) message.getMessageProperties().getHeaders().get("queueName");
System.out.println("RabbitMQConsumer.receiveMessage QueueName:" + queueName);
// System.out.println("Recieved Message From RabbitMQ: " + employee + " QueueName:"+ queueName);
}
}
public List<String> getEntriesQueueName() {
return Collections.emptyList();
// return Arrays.asList("Test queue1", "Test queue2", "Test queue3");
}
错误:当我设置 listener.setIsExclusive(true);
Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - queue 'Test queue3' in vhost 'my_vhost' in exclusive use, class-id=60, method-id=20)
甚至尝试在 Rabbitlistener 上设置独占
@RabbitListener(id = "Test container1", queues = "#{rabbitMQConsumer.getEntriesQueueName()}", containerFactory = "consumerBatchContainerFactory", exclusive = true)
但同样的错误。
有一个bug in the listener container when you call addQueueNames
,我们最终多次回收消费者并且有一场比赛 - 我们不等待消费者被取消。
改用listener.addQueues(new Queue(queueName));
。
此外,我将 setExclusive
移动到容器工厂 bean...
factory.setContainerCustomizer(container -> container.setExclusive(true));
我正在尝试以独占模式向现有侦听器添加新队列,但我遇到了问题。 代码:
@GetMapping(value = "/consumer")
public void consumer() {
Set<String> queue = new HashSet<>(Arrays.asList("Test queue1", "Test queue2", "Test queue3"));
// for (int i = 0; i < 10; i++) {
addQueueToListener("Test container1", queue);
// }
}
public void addQueueToListener(String containerId, String queueName) {
SimpleMessageListenerContainer listener = (SimpleMessageListenerContainer)
rabbitListenerEndpointRegistry.getListenerContainer(containerId);
if (Objects.nonNull(listener)) {
String[] queueNames = listener.getQueueNames();
System.out.println("Queue Names: " + Arrays.toString(queueNames) + " going to Add: " + queueName);
System.out.println("search result for queue name: " + queueName + " -->" + Arrays.binarySearch(queueNames, queueName));
//do not add same queue name to listener
if (Arrays.binarySearch(queueNames, queueName) < 0) {
System.out.println("Adding queue: " + queueName);
listener.addQueueNames(queueName);
System.out.println("added queue: " + queueName);
}
}
}
public void addQueueToListener(String containerId, Set<String> queueName) {
for (String queue : queueName) {
addQueueToListener(containerId, queue);
}
}
兔子监听代码:
@RabbitListener(id = "Test container1", queues = "#{rabbitMQConsumer.getEntriesQueueName()}", containerFactory = "consumerBatchContainerFactory")
public void receiveMessageBatch(List<Message> messages) throws InterruptedException, IOException {
for (Message message : messages) {
Thread.sleep(30000);
// ObjectMapper jsonObjectMapper = new ObjectMapper();
// Employee employee = jsonObjectMapper.readValue(Arrays.toString(message.getBody()), Employee.class);
String queueName = (String) message.getMessageProperties().getHeaders().get("queueName");
System.out.println("RabbitMQConsumer.receiveMessage QueueName:" + queueName);
// System.out.println("Recieved Message From RabbitMQ: " + employee + " QueueName:"+ queueName);
}
}
public List<String> getEntriesQueueName() {
return Collections.emptyList();
// return Arrays.asList("Test queue1", "Test queue2", "Test queue3");
}
错误:当我设置 listener.setIsExclusive(true);
Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - queue 'Test queue3' in vhost 'my_vhost' in exclusive use, class-id=60, method-id=20)
甚至尝试在 Rabbitlistener 上设置独占
@RabbitListener(id = "Test container1", queues = "#{rabbitMQConsumer.getEntriesQueueName()}", containerFactory = "consumerBatchContainerFactory", exclusive = true)
但同样的错误。
有一个bug in the listener container when you call addQueueNames
,我们最终多次回收消费者并且有一场比赛 - 我们不等待消费者被取消。
改用listener.addQueues(new Queue(queueName));
。
此外,我将 setExclusive
移动到容器工厂 bean...
factory.setContainerCustomizer(container -> container.setExclusive(true));