Spring RabbitListener 在消息发送后调用一个方法

Spring RabbitListener invoke a method after a message is sent

在我的接收器处理了特定数量的消息后,我需要停止 Rabbit 侦听器并调用一个方法,然后再次启用侦听器。

@RabbitListener(...)
public void sink(Message msg) {
    processMsg();

    if (condition) {
        rabbitListenerEndpointRegistry.stop();
        doTask();
        rabbitListenerEndpointRegistry.start();
    }
}

不幸的是,如果我在 sink 方法中停止侦听器,事务将失败并且消息将返回到队列。我正在寻找一种在事务完成并且侦听器未保存任何消息后调用方法的方法。

  1. Receive a msg and process the msg
  2. Finish the transaction and release the msg <- I don't want to receive any new msgs after this point
  3. If the condition is satisfied stop the listener
  4. Do a long running task
  5. Start the listener

我无法转到手动事务管理,因为它需要对我的代码进行太多更改,而且在执行自定义任务时我无法保留任何消息,因为它是一项很长的 运行 任务,并且我想让其他工作人员在这段时间处理消息。

Rabbit 工厂配置中的

setAfterReceivePostProcessorssetAdviceChain 在我的情况下不起作用,因为在这两种情况下,当侦听器持有消息时将调用该方法。

如果条件满足,您可以启动一个新线程,这将停止侦听器并执行一些任务并启动侦听器。虽然不能保证新线程会在原始消息消耗任何新消息之前停止侦听器 listener.But 它可以大致工作。

您可以拥有一个新队列,您可以在其中发布消息(如 start/stop),而不是手动启动线程,新的监听器可以在将指令消息放入新队列时停止原始监听器比如开始或停止。

第二种方法:

您可以尝试另一种我认为应该可行的方法,而不是将侦听器 setMaxConcurrentConsumers 设置为 0,这应该禁用侦听器,不允许它在不停止侦听器的情况下使用更多消息。在完成你的任务后将其更改为原始值,我猜你的情况是 1。

消息监听器容器似乎不适合您的应用程序;你需要一个 "pull" 模型。

您可以使用 rabbitTemplate.receive()receiveAndConvert() 方法中的一种来按需拉取消息。

如果您在交易之外使用它们,消息将立即被确认; 运行 如果您希望在流程完成后确认消息,请在事务中添加。