MessageListener.onMessage 在 RabbitMQ 上不断被调用 Spring Boot

MessageListener.onMessage is getting called continuously on RabbitMQ with Spring Boot

我有 MessageListener.onMessage 线程休眠。我正在模拟 onMessage 的实际处理时间 方法将采取上述线程睡眠。但是,我注意到它会连续多次调用剩余的消息,直到它们被 onMessage 方法处理为止。我认为这是一种低效率。

队列中的实际消息数:1000

输出 运行 次点击

onMessage<<15656
onMessage<<15657
onMessage<<15658
onMessage<<15659
onMessage<<15660
onMessage<<15661
onMessage<<15662
onMessage<<15663

代码块

@Service
class ThreadPooledMessageListener implements MessageListener {
@Autowired
TaskExecutor threadPoolTaskExecutor;

AtomicInteger processedCount = new AtomicInteger();

@Override
public void onMessage(Message message) {
    System.out.println("onMessage<<" + processedCount.incrementAndGet());
    threadPoolTaskExecutor.execute(new MessageProcessor(message));

}
}

class MessageProcessor implements Runnable {
Message processingMessage;

public MessageProcessor(Message message) {
    this.processingMessage = message;
}

@Override
public void run() {
    System.out.println("================================"+ Thread.currentThread().getName());
    System.out.println(processingMessage);
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("================================");
}
}

有什么可能的修复方法。


正如@Gary Russell 指出的那样;问题是我在我的代码中使用了非 spring 托管容器 SimpleMessageListenerContainer。使用 spring 托管 bean 修复它并在那里定义并发。按预期工作。 固定代码段

    @Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    container.setQueues(queue);
    container.setMessageListener(threadPooledMessageListener);
    container.setConcurrentConsumers(4);
    container.start();
    return container;
}

>I see this as an inefficiency.

不清楚你的意思。由于您正在将消息的处理移交给另一个线程,因此侦听器会立即退出,当然,下一条消息也会被传递。

如果发生故障,这将有丢失消息的风险。

如果你正在尝试实现并发;最好设置容器 concurrentConsumers 属性 而不是在侦听器中进行自己的线程管理。容器将为您管理消费者。