Spring - Rabbitmq 侦听器需要在数据库更改期间暂停和恢复

Spring - Rabbitmq Listener needs to paused and resumed during database change

嘿,要求是在更改后端表期间暂停 rabbitmq 侦听器处理消息。此更改仅限于我的应用程序,因此不想关闭整个 rabbitmq 实例。一旦这个过程完成,我想再次启动听众。

我面临的问题 我有 2 个侦听器连接到 2 个单独的队列,共享一个 'consumerconnectionFactory'。当我终止连接时,只有没有任何开放通道的连接被终止,当我恢复连接时,我得到了一个之前没有的额外连接。 你能帮忙吗

我在下面分享我的 java 配置。

@Bean
    public SimpleMessageListenerContainer auditMessageListenerContainer(AuditMessageListener auditMessageListener)
    {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(consumerConnectionFactory);
        container.setQueueNames(messagingAuditQueue);
        container.setMessageListener(auditMessageListener);
        container.setMaxConcurrentConsumers(5);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setDefaultRequeueRejected(false);
        container.setMissingQueuesFatal(false);
        container.setForceCloseChannel(true);
        container.setExclusive(false);
        return container;
    }
    @Bean
    public SimpleMessageListenerContainer accessMessageListenerContainer(AccessLogListener accessLogListener)
    {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(consumerConnectionFactory);
        container.setQueueNames(accessAuditQueue);
        container.setMessageListener(accessLogListener);
        container.setMaxConcurrentConsumers(5);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setDefaultRequeueRejected(false);
        container.setMissingQueuesFatal(false);
        container.setForceCloseChannel(true);
        container.setExclusive(false);
        return container;
    }

这就是我为监听器进行 Java 配置的方式。

下面是启动和停止监听器的 RestController

@RestController
@RequestMapping(MESSAGE_AUDIT_ROOT)
public class RestartController {
    @Autowired
    private List<MessageListenerContainer> listenerContainers;

    @Autowired
    private List<ConnectionFactory> connectionFactories;

    @GetMapping("/stop")
    public String stopMessageListenerContainer() {
        connectionFactories.forEach(conFactory -> {
            CachingConnectionFactory cConFactory = (CachingConnectionFactory) conFactory;
            cConFactory.resetConnection();
        });
        listenerContainers.forEach(container -> {
            SimpleMessageListenerContainer smlc = (SimpleMessageListenerContainer) container;
            smlc.shutdown();
        });
        listenerContainers.forEach(container -> System.out
                .println("Container: " + container.toString() + "is Running ?" + container.isRunning()));
        return "done - stop";
    }

    @GetMapping("/start")
    public String startMessageListenerContainer() {
        connectionFactories.forEach(conFactory -> {
            CachingConnectionFactory cConFactory = (CachingConnectionFactory) conFactory;
            cConFactory.createConnection();
        });
        listenerContainers.forEach(container -> {
            SimpleMessageListenerContainer smlc = (SimpleMessageListenerContainer) container;
            smlc.start();
        });
        listenerContainers.forEach(container -> System.out
                .println("Container: " + container.toString() + "is Running ?" + container.isRunning()));
        return "done - start";
    }

}

以下是我在本地看到的行为的图片。 1.初始连接列表

  1. 连接停止时 Rest call

2.1 队列连接仍然有效 3.连接开始时Rest Call

使用默认缓存模式 (CHANNEL),任何时候都应该只有一个连接,除非您将 RabbitTemplate 配置为 usePublisherConnection 设置为 true,在这种情况下,连接名称将是 api-audit.publisher.

因为您有两个与名字 api-audit 的联系,所以发生了一些非常奇怪的事情。我怀疑您以某种方式加载了两个连接工厂,也许一个在子应用程序上下文中?在单个应用程序上下文中不能有两个同名的 bean。

即您正在呼叫其中一个 resetConnection 而不是另一个。

我建议你在createConnection下一个断点,看看谁在使用第二个CF。

顺便说一句,你真的应该在容器停止后重新设置连接;否则容器将进入恢复模式并可能重新打开连接,具体取决于时间。