spring ampq注解驱动一个队列两个监听区分路由key
spring ampq annotation driven one queue two listeners distinguish routing key
其实我不明白运行。也许我误解了什么,无论如何这是不可能的。我正在尝试在同一个队列、同一个交换器上配置 2 个侦听器,但只有路由键应该不同。我的问题是不知何故事情变得一团糟。结果是听众 A 收到了发给听众 B 的消息。但只是有时一切正常。有什么建议吗?
我的配置
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(getHostname());
connectionFactory.setUsername(getUsername());
connectionFactory.setPassword(getPassword());
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setMessageConverter(new CustomMessageConverter());
factory.setConnectionFactory(connectionFactory());
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
factory.setConcurrentConsumers(10);
factory.setMaxConcurrentConsumers(10);
return factory;
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(new MappingJackson2MessageConverter());
return factory;
}
我的听众 A
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE, durable = "true"), exchange = @Exchange(value = EXCHANGE, type = "topic", durable = "true", ignoreDeclarationExceptions = "true"), key = "routingKeyA"))
public String myListenerA(@Payload PayloadA payload, @Header(AmqpHeaders.CORRELATION_ID) byte[] correlationId) {
return SUCCESS_RESPONSE;
}
MyListener B
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE, durable = "true"), exchange = @Exchange(value = EXCHANGE, type = "topic", durable = "true", ignoreDeclarationExceptions = "true"), key = "routingKeyB"))
public String myListenerB(@Payload PayloadB payload, @Header(AmqpHeaders.CORRELATION_ID) byte[] correlationId) {
return SUCCESS_RESPONSE;
}
附加信息:我在这个队列中有 20 个消费者。
提前致谢!
RabbitMQ 不是这样工作的;与 JMS 不同,无法从队列中 select 消息(例如基于路由键)。
您所做的只是使用 2 个不同的路由键将同一个队列绑定到交换器。所以,是的,无论消息是如何到达队列的,任何一个监听器都会收到消息。
使用 RabbitMQ,每个侦听器都需要一个单独的队列。当生产者发布到交换器时,代理将根据他使用的路由键将消息路由到正确的队列。
如果您有每个侦听器的多个实例,将相应地分发消息(每个队列只传递一个)。
其实我不明白运行。也许我误解了什么,无论如何这是不可能的。我正在尝试在同一个队列、同一个交换器上配置 2 个侦听器,但只有路由键应该不同。我的问题是不知何故事情变得一团糟。结果是听众 A 收到了发给听众 B 的消息。但只是有时一切正常。有什么建议吗?
我的配置
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(getHostname());
connectionFactory.setUsername(getUsername());
connectionFactory.setPassword(getPassword());
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setMessageConverter(new CustomMessageConverter());
factory.setConnectionFactory(connectionFactory());
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
factory.setConcurrentConsumers(10);
factory.setMaxConcurrentConsumers(10);
return factory;
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(new MappingJackson2MessageConverter());
return factory;
}
我的听众 A
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE, durable = "true"), exchange = @Exchange(value = EXCHANGE, type = "topic", durable = "true", ignoreDeclarationExceptions = "true"), key = "routingKeyA"))
public String myListenerA(@Payload PayloadA payload, @Header(AmqpHeaders.CORRELATION_ID) byte[] correlationId) {
return SUCCESS_RESPONSE;
}
MyListener B
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE, durable = "true"), exchange = @Exchange(value = EXCHANGE, type = "topic", durable = "true", ignoreDeclarationExceptions = "true"), key = "routingKeyB"))
public String myListenerB(@Payload PayloadB payload, @Header(AmqpHeaders.CORRELATION_ID) byte[] correlationId) {
return SUCCESS_RESPONSE;
}
附加信息:我在这个队列中有 20 个消费者。 提前致谢!
RabbitMQ 不是这样工作的;与 JMS 不同,无法从队列中 select 消息(例如基于路由键)。
您所做的只是使用 2 个不同的路由键将同一个队列绑定到交换器。所以,是的,无论消息是如何到达队列的,任何一个监听器都会收到消息。
使用 RabbitMQ,每个侦听器都需要一个单独的队列。当生产者发布到交换器时,代理将根据他使用的路由键将消息路由到正确的队列。
如果您有每个侦听器的多个实例,将相应地分发消息(每个队列只传递一个)。