Spring-amqp - 队列中的最后一条消息在我关闭服务器之前保持未确认状态
Spring-amqp -The last message in queue remain unacknowledged until I close the server
我是 spring-amqp 的新手。我正在尝试手动确认消息而不是使用自动确认。
我看到最后一条消息正在管理控制台中取消。
image for unacked message in managemnet console.
但是队列是空的。
一旦我停止服务器,最后一条消息就会得到确认。我该如何处理这个问题以及我如何在日志中打印消息id/information 未被确认..
这是我实现的代码。
RabbitConfig.java:
public class RabbitMQConfig {
final static String queueName = "spring-boot";
@Bean
Queue queue() {
return new Queue(queueName, true,false,false,null);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("spring-boot-exchange");
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(queueName);
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return container;
}
@Bean
Consumer receiver() {
return new Consumer();
}
@Bean
MessageListenerAdapter listenerAdapter(Consumer receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
Consumer.java
public class 消费者实现 ChannelAwareMessageListener{
@RabbitListener(queues = "spring-boot")
public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException, InterruptedException {
Thread.sleep(500);
channel.basicAck(tag, true);
System.out.println(tag + "received");
}
@Override
public void onMessage(Message arg0, Channel arg1) throws Exception {
// TODO Auto-generated method stub
}
生产者端点:
@RestController
public class HelloController {
private final RabbitTemplate rabbitTemplate;
public HelloController(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
// Call this end point from the postman or the browser then check in the
// rabbitmq server
@GetMapping(path = "/hello")
public String sayHello() throws InterruptedException {
// Producer operation
for (int i = 0; i < 100; i++) {
Thread.sleep(500);
rabbitTemplate.convertAndSend(RabbitMQConfig.queueName, "Hello World");
}
return "hello";
}
@GetMapping(path = "/hellotwo")
public String sayHellotwo() throws InterruptedException {
// Producer operation
for (int i = 0; i < 50; i++) {
Thread.sleep(500);
rabbitTemplate.convertAndSend(RabbitMQConfig.queueName, "SEcond message");
}
return "hellotwo";
}
您有两个侦听器容器; container
bean 和框架为 @RabbitListener
创建的一个 bean。
我不完全确定如果没有 运行 我自己测试会发生什么,但我怀疑问题是你试图从简单的 MessageListenerAdapter
.[=18= 调用 receiveMessage
]
该适配器仅设计用于调用带有一个参数的方法(从 Message
转换而来)。此外,该适配器不知道如何映射 @Header
参数。我怀疑交付失败,并且由于您使用的是手动确认,由于未确认交付和默认 qos (1),因此不再尝试向该容器交付。
您不需要 container
bean;而是配置消息侦听器容器工厂以设置确认模式。参见 the documentation。
如果您是 spring-amqp 的新手;你为什么认为你需要手动确认?默认模式(auto)意味着容器将为你 ack/nack(NONE 是传统的 rabbit auto-ack)。 Spring.
使用手动确认并不常见
我是 spring-amqp 的新手。我正在尝试手动确认消息而不是使用自动确认。
我看到最后一条消息正在管理控制台中取消。
image for unacked message in managemnet console. 但是队列是空的。
一旦我停止服务器,最后一条消息就会得到确认。我该如何处理这个问题以及我如何在日志中打印消息id/information 未被确认..
这是我实现的代码。
RabbitConfig.java:
public class RabbitMQConfig {
final static String queueName = "spring-boot";
@Bean
Queue queue() {
return new Queue(queueName, true,false,false,null);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("spring-boot-exchange");
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(queueName);
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return container;
}
@Bean
Consumer receiver() {
return new Consumer();
}
@Bean
MessageListenerAdapter listenerAdapter(Consumer receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
Consumer.java
public class 消费者实现 ChannelAwareMessageListener{
@RabbitListener(queues = "spring-boot")
public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException, InterruptedException {
Thread.sleep(500);
channel.basicAck(tag, true);
System.out.println(tag + "received");
}
@Override
public void onMessage(Message arg0, Channel arg1) throws Exception {
// TODO Auto-generated method stub
}
生产者端点:
@RestController public class HelloController {
private final RabbitTemplate rabbitTemplate;
public HelloController(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
// Call this end point from the postman or the browser then check in the
// rabbitmq server
@GetMapping(path = "/hello")
public String sayHello() throws InterruptedException {
// Producer operation
for (int i = 0; i < 100; i++) {
Thread.sleep(500);
rabbitTemplate.convertAndSend(RabbitMQConfig.queueName, "Hello World");
}
return "hello";
}
@GetMapping(path = "/hellotwo")
public String sayHellotwo() throws InterruptedException {
// Producer operation
for (int i = 0; i < 50; i++) {
Thread.sleep(500);
rabbitTemplate.convertAndSend(RabbitMQConfig.queueName, "SEcond message");
}
return "hellotwo";
}
您有两个侦听器容器; container
bean 和框架为 @RabbitListener
创建的一个 bean。
我不完全确定如果没有 运行 我自己测试会发生什么,但我怀疑问题是你试图从简单的 MessageListenerAdapter
.[=18= 调用 receiveMessage
]
该适配器仅设计用于调用带有一个参数的方法(从 Message
转换而来)。此外,该适配器不知道如何映射 @Header
参数。我怀疑交付失败,并且由于您使用的是手动确认,由于未确认交付和默认 qos (1),因此不再尝试向该容器交付。
您不需要 container
bean;而是配置消息侦听器容器工厂以设置确认模式。参见 the documentation。
如果您是 spring-amqp 的新手;你为什么认为你需要手动确认?默认模式(auto)意味着容器将为你 ack/nack(NONE 是传统的 rabbit auto-ack)。 Spring.
使用手动确认并不常见