Spring RabbitMQ - 在带有@RabbitListener 配置的服务上使用手动通道确认
Spring RabbitMQ - using manual channel acknowledgement on a service with @RabbitListener configuration
如何在不使用自动确认的情况下手动确认消息。
有没有办法将它与 @RabbitListener
和 @EnableRabbit
风格的配置一起使用。
大多数文档告诉我们使用 SimpleMessageListenerContainer
和 ChannelAwareMessageListener
。
然而,使用它我们失去了注释提供的灵活性。
我已将我的服务配置如下:
@Service
public class EventReceiver {
@Autowired
private MessageSender messageSender;
@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order) throws Exception {
// code for processing order
}
我的 RabbitConfiguration 如下
@EnableRabbit
public class RabbitApplication implements RabbitListenerConfigurer {
public static void main(String[] args) {
SpringApplication.run(RabbitApplication.class, args);
}
@Bean
public MappingJackson2MessageConverter jackson2Converter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
return converter;
@Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setMaxConcurrentConsumers(5);
factory.setMessageConverter((MessageConverter) jackson2Converter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
@Bean
public ConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("localhost");
return connectionFactory;
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setContainerFactory(myRabbitListenerContainerFactory());
}
@Autowired
private EventReceiver receiver;
}
}
任何有关如何调整手动通道确认以及上述配置样式的帮助将不胜感激。
如果我们实现 ChannelAwareMessageListener,那么 onMessage 签名将会改变。
我们可以在服务上实现 ChannelAwareMessageListener 吗?
将 Channel
添加到 @RabbitListener
方法...
@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
...
}
并使用 basicAck
、basicReject
.
中的标签
编辑
@SpringBootApplication
@EnableRabbit
public class So38728668Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So38728668Application.class, args);
context.getBean(RabbitTemplate.class).convertAndSend("", "so38728668", "foo");
context.getBean(Listener.class).latch.await(60, TimeUnit.SECONDS);
context.close();
}
@Bean
public Queue so38728668() {
return new Queue("so38728668");
}
@Bean
public Listener listener() {
return new Listener();
}
public static class Listener {
private final CountDownLatch latch = new CountDownLatch(1);
@RabbitListener(queues = "so38728668")
public void receive(String payload, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException {
System.out.println(payload);
channel.basicAck(tag, false);
latch.countDown();
}
}
}
application.properties:
spring.rabbitmq.listener.acknowledge-mode=manual
感谢加里的帮助。我终于解决了这个问题。为了他人的利益,我正在记录这一点。
这需要作为标准文档的一部分记录在 Spring AMQP 参考文档页面中。
服务 class 如下。
@Service
public class Consumer {
@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order, Channel channel) throws Exception {
// the above methodname can be anything but should have channel as second signature
channel.basicConsume(eventQueue, false, channel.getDefaultConsumer());
// Get the delivery tag
long deliveryTag = channel.basicGet(eventQueue, false).getEnvelope().getDeliveryTag();
try {
// code for processing order
catch(Exception) {
// handle exception
channel.basicReject(deliveryTag, true);
}
// If all logic is successful
channel.basicAck(deliveryTag, false);
}
配置也修改如下
public class RabbitApplication implements RabbitListenerConfigurer {
private static final Logger log = LoggerFactory.getLogger(RabbitApplication .class);
public static void main(String[] args) {
SpringApplication.run(RabbitApplication.class, args);
}
@Bean
public MappingJackson2MessageConverter jackson2Converter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
return converter;
}
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(jackson2Converter());
return factory;
}
@Autowired
private Consumer consumer;
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}
...
}
注意:无需配置 Rabbitconnectionfactory 或 containerfactor 等,因为注释隐式会处理所有这些。
以防万一您需要使用来自 ChannelAwareMessageListener 的#onMessage() class。那么你可以这样操作。
@Component
public class MyMessageListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) {
log.info("Message received.");
// do something with the message
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
而对于兔子配置
@Configuration
public class RabbitConfig {
public static final String topicExchangeName = "exchange1";
public static final String queueName = "queue1";
public static final String routingKey = "queue1.route.#";
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setUsername("xxxx");
connectionFactory.setPassword("xxxxxxxxxx");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("vHost1");
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
Queue queue() {
return new Queue(queueName, true);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(topicExchangeName);
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(routingKey);
}
@Bean
public SimpleMessageListenerContainer listenerContainer(MyMessageListener myRabbitMessageListener) {
SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
listenerContainer.setConnectionFactory(connectionFactory());
listenerContainer.setQueueNames(queueName);
listenerContainer.setMessageListener(myRabbitMessageListener);
listenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
listenerContainer.setConcurrency("4");
listenerContainer.setPrefetchCount(20);
return listenerContainer;
}
}
如何在不使用自动确认的情况下手动确认消息。
有没有办法将它与 @RabbitListener
和 @EnableRabbit
风格的配置一起使用。
大多数文档告诉我们使用 SimpleMessageListenerContainer
和 ChannelAwareMessageListener
。
然而,使用它我们失去了注释提供的灵活性。
我已将我的服务配置如下:
@Service
public class EventReceiver {
@Autowired
private MessageSender messageSender;
@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order) throws Exception {
// code for processing order
}
我的 RabbitConfiguration 如下
@EnableRabbit
public class RabbitApplication implements RabbitListenerConfigurer {
public static void main(String[] args) {
SpringApplication.run(RabbitApplication.class, args);
}
@Bean
public MappingJackson2MessageConverter jackson2Converter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
return converter;
@Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setMaxConcurrentConsumers(5);
factory.setMessageConverter((MessageConverter) jackson2Converter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
@Bean
public ConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("localhost");
return connectionFactory;
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setContainerFactory(myRabbitListenerContainerFactory());
}
@Autowired
private EventReceiver receiver;
}
}
任何有关如何调整手动通道确认以及上述配置样式的帮助将不胜感激。 如果我们实现 ChannelAwareMessageListener,那么 onMessage 签名将会改变。 我们可以在服务上实现 ChannelAwareMessageListener 吗?
将 Channel
添加到 @RabbitListener
方法...
@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
...
}
并使用 basicAck
、basicReject
.
编辑
@SpringBootApplication
@EnableRabbit
public class So38728668Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So38728668Application.class, args);
context.getBean(RabbitTemplate.class).convertAndSend("", "so38728668", "foo");
context.getBean(Listener.class).latch.await(60, TimeUnit.SECONDS);
context.close();
}
@Bean
public Queue so38728668() {
return new Queue("so38728668");
}
@Bean
public Listener listener() {
return new Listener();
}
public static class Listener {
private final CountDownLatch latch = new CountDownLatch(1);
@RabbitListener(queues = "so38728668")
public void receive(String payload, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException {
System.out.println(payload);
channel.basicAck(tag, false);
latch.countDown();
}
}
}
application.properties:
spring.rabbitmq.listener.acknowledge-mode=manual
感谢加里的帮助。我终于解决了这个问题。为了他人的利益,我正在记录这一点。 这需要作为标准文档的一部分记录在 Spring AMQP 参考文档页面中。 服务 class 如下。
@Service
public class Consumer {
@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order, Channel channel) throws Exception {
// the above methodname can be anything but should have channel as second signature
channel.basicConsume(eventQueue, false, channel.getDefaultConsumer());
// Get the delivery tag
long deliveryTag = channel.basicGet(eventQueue, false).getEnvelope().getDeliveryTag();
try {
// code for processing order
catch(Exception) {
// handle exception
channel.basicReject(deliveryTag, true);
}
// If all logic is successful
channel.basicAck(deliveryTag, false);
}
配置也修改如下
public class RabbitApplication implements RabbitListenerConfigurer {
private static final Logger log = LoggerFactory.getLogger(RabbitApplication .class);
public static void main(String[] args) {
SpringApplication.run(RabbitApplication.class, args);
}
@Bean
public MappingJackson2MessageConverter jackson2Converter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
return converter;
}
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(jackson2Converter());
return factory;
}
@Autowired
private Consumer consumer;
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}
...
}
注意:无需配置 Rabbitconnectionfactory 或 containerfactor 等,因为注释隐式会处理所有这些。
以防万一您需要使用来自 ChannelAwareMessageListener 的#onMessage() class。那么你可以这样操作。
@Component
public class MyMessageListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) {
log.info("Message received.");
// do something with the message
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
而对于兔子配置
@Configuration
public class RabbitConfig {
public static final String topicExchangeName = "exchange1";
public static final String queueName = "queue1";
public static final String routingKey = "queue1.route.#";
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setUsername("xxxx");
connectionFactory.setPassword("xxxxxxxxxx");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("vHost1");
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
Queue queue() {
return new Queue(queueName, true);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(topicExchangeName);
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(routingKey);
}
@Bean
public SimpleMessageListenerContainer listenerContainer(MyMessageListener myRabbitMessageListener) {
SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
listenerContainer.setConnectionFactory(connectionFactory());
listenerContainer.setQueueNames(queueName);
listenerContainer.setMessageListener(myRabbitMessageListener);
listenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
listenerContainer.setConcurrency("4");
listenerContainer.setPrefetchCount(20);
return listenerContainer;
}
}