根据 Spring AMQP 中的消费者数量使用队列
Consuming a queue based on its consumer count in Spring AMQP
我想让一个队列一次只被一个订阅者使用。因此,如果一个订阅者掉线,那么另一个订阅者将有机会订阅。
我正在寻找 Spring AMQP 中的正确方法。基于 RabbitMQ 网站上的示例,我在纯 Java 中完成了此操作。我被动声明队列,查看它的消费者计数,如果为0,则开始消费。
这是代码。
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
int count = channel.queueDeclarePassive(QUEUE_NAME).getConsumerCount();
System.out.println("count is "+count);
if (count == 0) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
} else{
System.out.println("subscribed by some other processor(s)");
}
我也可以通过这种方式检查 Spring AMQP 中的订阅者数量。但是为时已晚,因为它已经在监听队列了。
@RabbitListener(queues = "q1")
public void receivedMessageQ1(String message, Channel channel){
try {
int q1 = channel.queueDeclarePassive("q1").getConsumerCount();
// do something.
} catch (IOException e) {
System.out.println("exception occurred");
}
}
简而言之,我想根据消费者数量来消费队列。我希望我清楚。
在 @RabbitListener
上设置 exclusive
标志; RabbitMQ 将只允许一个实例消费。其他实例将尝试每 5 秒侦听一次(默认情况下)。要增加间隔,请设置容器工厂的 recoveryBackOff
.
@SpringBootApplication
public class So56319999Application {
public static void main(String[] args) {
SpringApplication.run(So56319999Application.class, args);
}
@RabbitListener(queues = "so56319999", exclusive = true)
public void listen (String in) {
}
@Bean
public Queue queue() {
return new Queue("so56319999");
}
}
我想让一个队列一次只被一个订阅者使用。因此,如果一个订阅者掉线,那么另一个订阅者将有机会订阅。
我正在寻找 Spring AMQP 中的正确方法。基于 RabbitMQ 网站上的示例,我在纯 Java 中完成了此操作。我被动声明队列,查看它的消费者计数,如果为0,则开始消费。
这是代码。
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
int count = channel.queueDeclarePassive(QUEUE_NAME).getConsumerCount();
System.out.println("count is "+count);
if (count == 0) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
} else{
System.out.println("subscribed by some other processor(s)");
}
我也可以通过这种方式检查 Spring AMQP 中的订阅者数量。但是为时已晚,因为它已经在监听队列了。
@RabbitListener(queues = "q1")
public void receivedMessageQ1(String message, Channel channel){
try {
int q1 = channel.queueDeclarePassive("q1").getConsumerCount();
// do something.
} catch (IOException e) {
System.out.println("exception occurred");
}
}
简而言之,我想根据消费者数量来消费队列。我希望我清楚。
在 @RabbitListener
上设置 exclusive
标志; RabbitMQ 将只允许一个实例消费。其他实例将尝试每 5 秒侦听一次(默认情况下)。要增加间隔,请设置容器工厂的 recoveryBackOff
.
@SpringBootApplication
public class So56319999Application {
public static void main(String[] args) {
SpringApplication.run(So56319999Application.class, args);
}
@RabbitListener(queues = "so56319999", exclusive = true)
public void listen (String in) {
}
@Bean
public Queue queue() {
return new Queue("so56319999");
}
}