Spring Amqp - 在数据库批处理时动态地 Disable/Enable RabbitListener 运行
Spring Amqp - Dynamically Disable/Enable RabbitListener while DB Batch running
我们有这样的场景,消息从 Amqp 队列中拉出并写入我们的数据库 - 但是,在计划的数据库批处理运行期间,数据库不可用,我们无法写入,因此我们希望消息保留在 windows 期间排队。
我正在尝试确定处理此问题的最佳方法 - 只需在 @RabbitListener 方法中抛出异常以(重复)重新排队消息或使用 Spring 调度程序尝试 stop/start 侦听器(我看到 SimpleMessageListenerContainer 有 stop/start 方法)。
关于最佳(或更好)方法有什么建议吗?
要考虑的另一种方法是侦听器的回复功能。
查看 AbstractRabbitListenerContainerFactory
JavaDocs:
/**
* Set a {@link RetryTemplate} to use when sending replies; added to each message
* listener adapter.
* @param retryTemplate the template.
* @since 2.0.6
* @see #setReplyRecoveryCallback(RecoveryCallback)
* @see AbstractAdaptableMessageListener#setRetryTemplate(RetryTemplate)
*/
public void setRetryTemplate(RetryTemplate retryTemplate) {
this.retryTemplate = retryTemplate;
}
/**
* Set a {@link RecoveryCallback} to invoke when retries are exhausted. Added to each
* message listener adapter. Only used if a {@link #setRetryTemplate(RetryTemplate)
* retryTemplate} is provided.
* @param recoveryCallback the recovery callback.
* @since 2.0.6
* @see #setRetryTemplate(RetryTemplate)
* @see AbstractAdaptableMessageListener#setRecoveryCallback(RecoveryCallback)
*/
public void setReplyRecoveryCallback(RecoveryCallback<?> recoveryCallback) {
this.recoveryCallback = recoveryCallback;
}
你是对的:你也可以使用 start()/stop()
。
出于这个原因,您需要注入一个 RabbitListenerEndpointRegistry
bean 并使用它的:
/**
* Return the {@link MessageListenerContainer} with the specified id or
* {@code null} if no such container exists.
* @param id the id of the container
* @return the container or {@code null} if no container with that id exists
* @see RabbitListenerEndpoint#getId()
* @see #getListenerContainerIds()
*/
public MessageListenerContainer getListenerContainer(String id) {
根据您的逻辑,为您的 @RabbitListener
和 stop()
或 start()
访问适当的侦听器容器。
我们有这样的场景,消息从 Amqp 队列中拉出并写入我们的数据库 - 但是,在计划的数据库批处理运行期间,数据库不可用,我们无法写入,因此我们希望消息保留在 windows 期间排队。
我正在尝试确定处理此问题的最佳方法 - 只需在 @RabbitListener 方法中抛出异常以(重复)重新排队消息或使用 Spring 调度程序尝试 stop/start 侦听器(我看到 SimpleMessageListenerContainer 有 stop/start 方法)。
关于最佳(或更好)方法有什么建议吗?
要考虑的另一种方法是侦听器的回复功能。
查看 AbstractRabbitListenerContainerFactory
JavaDocs:
/**
* Set a {@link RetryTemplate} to use when sending replies; added to each message
* listener adapter.
* @param retryTemplate the template.
* @since 2.0.6
* @see #setReplyRecoveryCallback(RecoveryCallback)
* @see AbstractAdaptableMessageListener#setRetryTemplate(RetryTemplate)
*/
public void setRetryTemplate(RetryTemplate retryTemplate) {
this.retryTemplate = retryTemplate;
}
/**
* Set a {@link RecoveryCallback} to invoke when retries are exhausted. Added to each
* message listener adapter. Only used if a {@link #setRetryTemplate(RetryTemplate)
* retryTemplate} is provided.
* @param recoveryCallback the recovery callback.
* @since 2.0.6
* @see #setRetryTemplate(RetryTemplate)
* @see AbstractAdaptableMessageListener#setRecoveryCallback(RecoveryCallback)
*/
public void setReplyRecoveryCallback(RecoveryCallback<?> recoveryCallback) {
this.recoveryCallback = recoveryCallback;
}
你是对的:你也可以使用 start()/stop()
。
出于这个原因,您需要注入一个 RabbitListenerEndpointRegistry
bean 并使用它的:
/**
* Return the {@link MessageListenerContainer} with the specified id or
* {@code null} if no such container exists.
* @param id the id of the container
* @return the container or {@code null} if no container with that id exists
* @see RabbitListenerEndpoint#getId()
* @see #getListenerContainerIds()
*/
public MessageListenerContainer getListenerContainer(String id) {
根据您的逻辑,为您的 @RabbitListener
和 stop()
或 start()
访问适当的侦听器容器。