Spring - Rabbitmq 侦听器需要在数据库更改期间暂停和恢复
Spring - Rabbitmq Listener needs to paused and resumed during database change
嘿,要求是在更改后端表期间暂停 rabbitmq 侦听器处理消息。此更改仅限于我的应用程序,因此不想关闭整个 rabbitmq 实例。一旦这个过程完成,我想再次启动听众。
我面临的问题
我有 2 个侦听器连接到 2 个单独的队列,共享一个 'consumerconnectionFactory'。当我终止连接时,只有没有任何开放通道的连接被终止,当我恢复连接时,我得到了一个之前没有的额外连接。 你能帮忙吗
我在下面分享我的 java 配置。
@Bean
public SimpleMessageListenerContainer auditMessageListenerContainer(AuditMessageListener auditMessageListener)
{
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(consumerConnectionFactory);
container.setQueueNames(messagingAuditQueue);
container.setMessageListener(auditMessageListener);
container.setMaxConcurrentConsumers(5);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setDefaultRequeueRejected(false);
container.setMissingQueuesFatal(false);
container.setForceCloseChannel(true);
container.setExclusive(false);
return container;
}
@Bean
public SimpleMessageListenerContainer accessMessageListenerContainer(AccessLogListener accessLogListener)
{
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(consumerConnectionFactory);
container.setQueueNames(accessAuditQueue);
container.setMessageListener(accessLogListener);
container.setMaxConcurrentConsumers(5);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setDefaultRequeueRejected(false);
container.setMissingQueuesFatal(false);
container.setForceCloseChannel(true);
container.setExclusive(false);
return container;
}
这就是我为监听器进行 Java 配置的方式。
下面是启动和停止监听器的 RestController
@RestController
@RequestMapping(MESSAGE_AUDIT_ROOT)
public class RestartController {
@Autowired
private List<MessageListenerContainer> listenerContainers;
@Autowired
private List<ConnectionFactory> connectionFactories;
@GetMapping("/stop")
public String stopMessageListenerContainer() {
connectionFactories.forEach(conFactory -> {
CachingConnectionFactory cConFactory = (CachingConnectionFactory) conFactory;
cConFactory.resetConnection();
});
listenerContainers.forEach(container -> {
SimpleMessageListenerContainer smlc = (SimpleMessageListenerContainer) container;
smlc.shutdown();
});
listenerContainers.forEach(container -> System.out
.println("Container: " + container.toString() + "is Running ?" + container.isRunning()));
return "done - stop";
}
@GetMapping("/start")
public String startMessageListenerContainer() {
connectionFactories.forEach(conFactory -> {
CachingConnectionFactory cConFactory = (CachingConnectionFactory) conFactory;
cConFactory.createConnection();
});
listenerContainers.forEach(container -> {
SimpleMessageListenerContainer smlc = (SimpleMessageListenerContainer) container;
smlc.start();
});
listenerContainers.forEach(container -> System.out
.println("Container: " + container.toString() + "is Running ?" + container.isRunning()));
return "done - start";
}
}
以下是我在本地看到的行为的图片。
1.初始连接列表
- 连接停止时 Rest call
2.1 队列连接仍然有效
3.连接开始时Rest Call
使用默认缓存模式 (CHANNEL),任何时候都应该只有一个连接,除非您将 RabbitTemplate
配置为 usePublisherConnection
设置为 true,在这种情况下,连接名称将是 api-audit.publisher
.
因为您有两个与名字 api-audit
的联系,所以发生了一些非常奇怪的事情。我怀疑您以某种方式加载了两个连接工厂,也许一个在子应用程序上下文中?在单个应用程序上下文中不能有两个同名的 bean。
即您正在呼叫其中一个 resetConnection
而不是另一个。
我建议你在createConnection
下一个断点,看看谁在使用第二个CF。
顺便说一句,你真的应该在容器停止后重新设置连接;否则容器将进入恢复模式并可能重新打开连接,具体取决于时间。
嘿,要求是在更改后端表期间暂停 rabbitmq 侦听器处理消息。此更改仅限于我的应用程序,因此不想关闭整个 rabbitmq 实例。一旦这个过程完成,我想再次启动听众。
我面临的问题 我有 2 个侦听器连接到 2 个单独的队列,共享一个 'consumerconnectionFactory'。当我终止连接时,只有没有任何开放通道的连接被终止,当我恢复连接时,我得到了一个之前没有的额外连接。 你能帮忙吗
我在下面分享我的 java 配置。
@Bean
public SimpleMessageListenerContainer auditMessageListenerContainer(AuditMessageListener auditMessageListener)
{
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(consumerConnectionFactory);
container.setQueueNames(messagingAuditQueue);
container.setMessageListener(auditMessageListener);
container.setMaxConcurrentConsumers(5);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setDefaultRequeueRejected(false);
container.setMissingQueuesFatal(false);
container.setForceCloseChannel(true);
container.setExclusive(false);
return container;
}
@Bean
public SimpleMessageListenerContainer accessMessageListenerContainer(AccessLogListener accessLogListener)
{
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(consumerConnectionFactory);
container.setQueueNames(accessAuditQueue);
container.setMessageListener(accessLogListener);
container.setMaxConcurrentConsumers(5);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setDefaultRequeueRejected(false);
container.setMissingQueuesFatal(false);
container.setForceCloseChannel(true);
container.setExclusive(false);
return container;
}
这就是我为监听器进行 Java 配置的方式。
下面是启动和停止监听器的 RestController
@RestController
@RequestMapping(MESSAGE_AUDIT_ROOT)
public class RestartController {
@Autowired
private List<MessageListenerContainer> listenerContainers;
@Autowired
private List<ConnectionFactory> connectionFactories;
@GetMapping("/stop")
public String stopMessageListenerContainer() {
connectionFactories.forEach(conFactory -> {
CachingConnectionFactory cConFactory = (CachingConnectionFactory) conFactory;
cConFactory.resetConnection();
});
listenerContainers.forEach(container -> {
SimpleMessageListenerContainer smlc = (SimpleMessageListenerContainer) container;
smlc.shutdown();
});
listenerContainers.forEach(container -> System.out
.println("Container: " + container.toString() + "is Running ?" + container.isRunning()));
return "done - stop";
}
@GetMapping("/start")
public String startMessageListenerContainer() {
connectionFactories.forEach(conFactory -> {
CachingConnectionFactory cConFactory = (CachingConnectionFactory) conFactory;
cConFactory.createConnection();
});
listenerContainers.forEach(container -> {
SimpleMessageListenerContainer smlc = (SimpleMessageListenerContainer) container;
smlc.start();
});
listenerContainers.forEach(container -> System.out
.println("Container: " + container.toString() + "is Running ?" + container.isRunning()));
return "done - start";
}
}
以下是我在本地看到的行为的图片。
1.初始连接列表
- 连接停止时 Rest call
2.1 队列连接仍然有效
使用默认缓存模式 (CHANNEL),任何时候都应该只有一个连接,除非您将 RabbitTemplate
配置为 usePublisherConnection
设置为 true,在这种情况下,连接名称将是 api-audit.publisher
.
因为您有两个与名字 api-audit
的联系,所以发生了一些非常奇怪的事情。我怀疑您以某种方式加载了两个连接工厂,也许一个在子应用程序上下文中?在单个应用程序上下文中不能有两个同名的 bean。
即您正在呼叫其中一个 resetConnection
而不是另一个。
我建议你在createConnection
下一个断点,看看谁在使用第二个CF。
顺便说一句,你真的应该在容器停止后重新设置连接;否则容器将进入恢复模式并可能重新打开连接,具体取决于时间。