如何在 Spring AMQP 中收听现有队列?
How to listen to an existing queue in Spring AMQP?
我有一个远程 RabbitMQ 服务器,其中有一些我想听的队列。我试过这个:
@RabbitListener(queues = "queueName")
public void receive(String message) {
System.out.println(message);
}
但它试图创建一个新队列。结果是可预测的 - 访问被拒绝。
o.s.a.r.listener.BlockingQueueConsumer : Failed to declare queue: queueName
我没有以任何其他方式声明任何队列。
如何监听远程服务器上的现有队列?另外,有没有办法检查这个队列是否存在?我看到了这条线
@RabbitListener(queues = "#{autoDeleteQueue2.name}")
在教程中。 #{queueName.name}
是什么意思?
日志和堆栈跟踪的开头:
2018-08-30 22:10:21.968 WARN 12124 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Failed to declare queue: queueName
2018-08-30 22:10:21.991 WARN 12124 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Queue declaration failed; retries left=3
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[queueName]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:711) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:588) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:996) [spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
这里有一个关于如何使用 rabbitMq 监听队列的例子:
@Component
public class RabbitConsumer implements MessageListener {
@RabbitListener(bindings =
@QueueBinding(
value = @Queue(value = "${queue.topic}", durable = "true"),
exchange = @Exchange(value = "${queue.exchange}", type = ExchangeTypes.FANOUT, durable = "true")
)
)
@Override
public void onMessage(Message message) {
// ...
}
}
和配置 (application.yaml) :
queue:
topic: mytopic
exchange: myexchange
在 rabbitmq 中,consumer 与 exchange 相关联。它允许您定义必须如何使用消息(所有消费者都听所有消息吗?如果只有一个消费者阅读消息,这是否足够?...)
即使您没有代理的配置权限,也允许侦听器使用的 queueDeclarePassive
(它会检查队列是否存在)。
o.s.a.r.listener.BlockingQueueConsumer : Failed to declare queue: queueName
这只是意味着队列不存在。
@RabbitListener(queues = "#{autoDeleteQueue2.name}")
用于在运行时获取队列名称(当您有创建队列的权限时)。
例如
@Bean
public AnonymousQueue autoDeleteQueue2() {
return new AnonymousQueue();
}
Spring 将使用随机的唯一名称将该队列添加到代理。然后使用实际队列名称配置侦听器。
这是一个如何使用 Spring 集成收听特定 'queue' 的示例:
SpringIntegrationConfiguration.java
@Configuration
public class SpringIntegrationConfiguration {
@Value("${rabbitmq.queueName}")
private String queueName;
@Bean
public IntegrationFlow ampqInbound(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, queueName))
.handle(System.out::println)
.get();
}
}
ApplicationConfiguration.java
@Configuration
public class ApplicationConfiguration {
@Value("${rabbitmq.topicExchangeName}")
private String topicExchangeName;
@Value("${rabbitmq.queueName}")
private String queueName;
@Value("${rabbitmq.routingKey}")
private String routingKey;
@Bean
Queue queue() {
return new Queue(queueName, false);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(topicExchangeName);
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(routingKey);
}
}
Application.yml
rabbitmq:
topicExchangeName: spring-boot-exchange
queueName: spring-boot
routingKey: foo.bar.#
我有一个远程 RabbitMQ 服务器,其中有一些我想听的队列。我试过这个:
@RabbitListener(queues = "queueName")
public void receive(String message) {
System.out.println(message);
}
但它试图创建一个新队列。结果是可预测的 - 访问被拒绝。
o.s.a.r.listener.BlockingQueueConsumer : Failed to declare queue: queueName
我没有以任何其他方式声明任何队列。
如何监听远程服务器上的现有队列?另外,有没有办法检查这个队列是否存在?我看到了这条线
@RabbitListener(queues = "#{autoDeleteQueue2.name}")
在教程中。 #{queueName.name}
是什么意思?
日志和堆栈跟踪的开头:
2018-08-30 22:10:21.968 WARN 12124 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Failed to declare queue: queueName
2018-08-30 22:10:21.991 WARN 12124 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Queue declaration failed; retries left=3
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[queueName]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:711) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:588) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:996) [spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
这里有一个关于如何使用 rabbitMq 监听队列的例子:
@Component
public class RabbitConsumer implements MessageListener {
@RabbitListener(bindings =
@QueueBinding(
value = @Queue(value = "${queue.topic}", durable = "true"),
exchange = @Exchange(value = "${queue.exchange}", type = ExchangeTypes.FANOUT, durable = "true")
)
)
@Override
public void onMessage(Message message) {
// ...
}
}
和配置 (application.yaml) :
queue:
topic: mytopic
exchange: myexchange
在 rabbitmq 中,consumer 与 exchange 相关联。它允许您定义必须如何使用消息(所有消费者都听所有消息吗?如果只有一个消费者阅读消息,这是否足够?...)
即使您没有代理的配置权限,也允许侦听器使用的 queueDeclarePassive
(它会检查队列是否存在)。
o.s.a.r.listener.BlockingQueueConsumer : Failed to declare queue: queueName
这只是意味着队列不存在。
@RabbitListener(queues = "#{autoDeleteQueue2.name}")
用于在运行时获取队列名称(当您有创建队列的权限时)。
例如
@Bean
public AnonymousQueue autoDeleteQueue2() {
return new AnonymousQueue();
}
Spring 将使用随机的唯一名称将该队列添加到代理。然后使用实际队列名称配置侦听器。
这是一个如何使用 Spring 集成收听特定 'queue' 的示例:
SpringIntegrationConfiguration.java
@Configuration
public class SpringIntegrationConfiguration {
@Value("${rabbitmq.queueName}")
private String queueName;
@Bean
public IntegrationFlow ampqInbound(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, queueName))
.handle(System.out::println)
.get();
}
}
ApplicationConfiguration.java
@Configuration
public class ApplicationConfiguration {
@Value("${rabbitmq.topicExchangeName}")
private String topicExchangeName;
@Value("${rabbitmq.queueName}")
private String queueName;
@Value("${rabbitmq.routingKey}")
private String routingKey;
@Bean
Queue queue() {
return new Queue(queueName, false);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(topicExchangeName);
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(routingKey);
}
}
Application.yml
rabbitmq:
topicExchangeName: spring-boot-exchange
queueName: spring-boot
routingKey: foo.bar.#