如何使用 SimpleMessageListenerContainer bean 从 RabbitMq 队列破坏性地读取消息
How to read messages destructively from RabbitMq queue using SimpleMessageListenerContainer bean
我需要帮助来实现 SimpleMessageListenerContainer,只要消息接收者读取消息,它就会以非事务方式删除消息。
在我的例子中,无论事务成功或不成功,放入队列的消息都挂在某个地方(不在队列中),并且在从队列读取的每个操作中重复处理。因此放入队列的所有其他消息仍然无法访问,并且每次只重新处理第一个消息。
另一件奇怪的事情是,我在队列中看不到消息 landing/queuing,即 Rabbit 管理控制台上的队列深度从未改变,只是消息速率在每次写入时跳跃到队列中。
下面是我的 Java 配置的代码片段。如果有人能指出这里的错误,将会有很大的帮助:-
@Configuration
@EnableJpaRepositories(basePackages={"com.xxx.muppets"})
public class MuppetMessageConsumerConfig {
private ApplicationContext applicationContext;
@Value("${rabbit.queue.name}")
private String queueName;
@Autowired
public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
@Bean
Queue queue() {
return new Queue(queueName, false);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("spring-boot-exchange");
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(queueName);
}
@Bean
MuppetMsgReceiver muppetMsgReceiver(){
return new MuppetMsgReceiver();
}
@Bean
MessageListenerAdapter listenerAdapter(MuppetMsgReceiver muppetMsgReceiver){
return new MessageListenerAdapter(muppetMsgReceiver, "receiveMessage");
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setAcknowledgeMode(NONE);
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
return container;
}
}
我的消息接收者class如下:
public class MuppetMsgReceiver {
private String muppetMessage;
private CountDownLatch countDownLatch;
public MuppetMsgReceiver() {
this.countDownLatch = new CountDownLatch(1);
}
public MuppetMsgReceiver(CountDownLatch latch) {
this.countDownLatch = latch;
CountDownLatch getCountDownLatch() {
return countDownLatch;
}
public void receiveMessage(String receivedMessage) {
countDownLatch.countDown();
this.muppetMessage = receivedMessage;
}
public String getMuppetMessage() {
return muppetMessage;
}
}
此完整代码基于 Spring 的 getting started example,但由于从队列中进行非破坏性读取,因此无济于事。
AcknowledgeMode=NONE
意味着代理会在消息 发送 给消费者后立即确认消息,而不是在消费者收到消息时确认消息,因此您观察到的行为对我来说毫无意义.使用这种 ack 模式是很不寻常的,但是消息一旦发送就会消失。这可能是您的代码有问题。 TRACE 日志记录将帮助您了解发生了什么。
我需要帮助来实现 SimpleMessageListenerContainer,只要消息接收者读取消息,它就会以非事务方式删除消息。
在我的例子中,无论事务成功或不成功,放入队列的消息都挂在某个地方(不在队列中),并且在从队列读取的每个操作中重复处理。因此放入队列的所有其他消息仍然无法访问,并且每次只重新处理第一个消息。
另一件奇怪的事情是,我在队列中看不到消息 landing/queuing,即 Rabbit 管理控制台上的队列深度从未改变,只是消息速率在每次写入时跳跃到队列中。
下面是我的 Java 配置的代码片段。如果有人能指出这里的错误,将会有很大的帮助:-
@Configuration
@EnableJpaRepositories(basePackages={"com.xxx.muppets"})
public class MuppetMessageConsumerConfig {
private ApplicationContext applicationContext;
@Value("${rabbit.queue.name}")
private String queueName;
@Autowired
public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
@Bean
Queue queue() {
return new Queue(queueName, false);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("spring-boot-exchange");
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(queueName);
}
@Bean
MuppetMsgReceiver muppetMsgReceiver(){
return new MuppetMsgReceiver();
}
@Bean
MessageListenerAdapter listenerAdapter(MuppetMsgReceiver muppetMsgReceiver){
return new MessageListenerAdapter(muppetMsgReceiver, "receiveMessage");
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setAcknowledgeMode(NONE);
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
return container;
}
}
我的消息接收者class如下:
public class MuppetMsgReceiver {
private String muppetMessage;
private CountDownLatch countDownLatch;
public MuppetMsgReceiver() {
this.countDownLatch = new CountDownLatch(1);
}
public MuppetMsgReceiver(CountDownLatch latch) {
this.countDownLatch = latch;
CountDownLatch getCountDownLatch() {
return countDownLatch;
}
public void receiveMessage(String receivedMessage) {
countDownLatch.countDown();
this.muppetMessage = receivedMessage;
}
public String getMuppetMessage() {
return muppetMessage;
}
}
此完整代码基于 Spring 的 getting started example,但由于从队列中进行非破坏性读取,因此无济于事。
AcknowledgeMode=NONE
意味着代理会在消息 发送 给消费者后立即确认消息,而不是在消费者收到消息时确认消息,因此您观察到的行为对我来说毫无意义.使用这种 ack 模式是很不寻常的,但是消息一旦发送就会消失。这可能是您的代码有问题。 TRACE 日志记录将帮助您了解发生了什么。