如何创建@RabbitListener 是幂等的
How to create @RabbitListener to be idempotent
我们的配置是:1...n 个具有共享数据库的消息接收器。
消息应该只被处理一次。
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "message-queue", durable = "true"),
exchange = @Exchange(value = TOPIC_EXCHANGE, type = "topic", durable = "true"),
key = MESSAGE_QUEUE1_RK)
)
public void receiveMessage(CustomMessage message) throws InterruptedException {
System.out.println("I have been received = " + message);
}
我们要保证消息将被处理一次,我们有一个消息存储库,其中包含已处理消息的 ID。
是否可以在 receiveMessage 之前挂钩此检查?
我们尝试查看带有 rabbitTemplate 的 MessagePostProcessor,但似乎没有用。
关于如何做到这一点有什么建议吗?
我们尝试使用 MethodInterceptor 并且这有效,但是非常难看。
谢谢
找到解决方案 - 感谢 Gary
我创建了一个实现 SmartLifecycle
的 MessagePostProcessorInjector
在启动时,我检查每个容器,如果它是 AbstractMessageListenerContainer
添加一个客户 MessagePostProccesser
和一个自定义 ErrorHandler
寻找特定类型的异常并丢弃它们(其他转发到 defaultErrorHandler)
由于我们使用的是 DLQ,我发现抛出异常或设置为 null 不会真正起作用。
我将在 MPP 之后发出一个 pull request 来忽略空消息。
有趣; SimpleMessageListenerContainer
确实有一个 属性 afterReceivePostProcessors
( 目前无法通过注释使用的侦听器容器工厂获得,但可以稍后注入 ) .
但是,那些 post 处理器无济于事,因为我们仍然调用侦听器。
请随时为两件事打开一个JIRA Improvement Issue:
在侦听器容器工厂中公开 afterReceivePostProcessors
- 如果 post 处理器 return 为 null,则跳过调用侦听器方法。
(更正,属性确实是工厂暴露的)
编辑
它是如何工作的...
上下文初始化期间...
- 对于 bean post 处理器检测到的每个注解,都会在
RabbitListenerEndpointRegistry
中创建并注册容器
- 在上下文初始化接近尾声时,注册表被
start()
ed 并启动为 autoStartup
(默认)配置的所有容器。
要在容器启动之前对其进行进一步配置(例如,对于容器工厂当前未公开的属性),请将 autoStartup
设置为 false
。
然后您可以从注册表中获取容器(作为集合或通过 id
)。只需 @Autowire
您应用中的注册表。
将容器转换为 SimpleMessageListenerContainer
(或者如果使用 Spring AMQP 2.0 或更高版本并且您正在使用它的工厂,则转换为 DirectMessageListenerContainer
)。
设置附加属性(如afterReceiveMessagePostProcessors
);然后 start()
容器。
注意:在我们增强容器以允许 return null
的 MPP 之前,一种可能的替代方法是从 MPP 中抛出 AmqpRejectAndDontRequeueException
。但是,如果您配置了 DLQ,这可能不是您想要的。
当消息重复时,从 DuplicateChecking MPP 的 postProcessMessage() 中抛出从 ImmediateAcknowledgeAmqpException 扩展的异常也不会将消息传递给 rabbit Listener。
我们的配置是:1...n 个具有共享数据库的消息接收器。 消息应该只被处理一次。
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "message-queue", durable = "true"),
exchange = @Exchange(value = TOPIC_EXCHANGE, type = "topic", durable = "true"),
key = MESSAGE_QUEUE1_RK)
)
public void receiveMessage(CustomMessage message) throws InterruptedException {
System.out.println("I have been received = " + message);
}
我们要保证消息将被处理一次,我们有一个消息存储库,其中包含已处理消息的 ID。 是否可以在 receiveMessage 之前挂钩此检查? 我们尝试查看带有 rabbitTemplate 的 MessagePostProcessor,但似乎没有用。
关于如何做到这一点有什么建议吗? 我们尝试使用 MethodInterceptor 并且这有效,但是非常难看。 谢谢
找到解决方案 - 感谢 Gary
我创建了一个实现 SmartLifecycle
的 MessagePostProcessorInjector
在启动时,我检查每个容器,如果它是 AbstractMessageListenerContainer
添加一个客户 MessagePostProccesser
和一个自定义 ErrorHandler
寻找特定类型的异常并丢弃它们(其他转发到 defaultErrorHandler)
由于我们使用的是 DLQ,我发现抛出异常或设置为 null 不会真正起作用。
我将在 MPP 之后发出一个 pull request 来忽略空消息。
有趣; SimpleMessageListenerContainer
确实有一个 属性 afterReceivePostProcessors
( 目前无法通过注释使用的侦听器容器工厂获得,但可以稍后注入 ) .
但是,那些 post 处理器无济于事,因为我们仍然调用侦听器。
请随时为两件事打开一个JIRA Improvement Issue:
在侦听器容器工厂中公开afterReceivePostProcessors
- 如果 post 处理器 return 为 null,则跳过调用侦听器方法。
(更正,属性确实是工厂暴露的)
编辑
它是如何工作的...
上下文初始化期间...
- 对于 bean post 处理器检测到的每个注解,都会在
RabbitListenerEndpointRegistry
中创建并注册容器
- 在上下文初始化接近尾声时,注册表被
start()
ed 并启动为autoStartup
(默认)配置的所有容器。
要在容器启动之前对其进行进一步配置(例如,对于容器工厂当前未公开的属性),请将 autoStartup
设置为 false
。
然后您可以从注册表中获取容器(作为集合或通过 id
)。只需 @Autowire
您应用中的注册表。
将容器转换为 SimpleMessageListenerContainer
(或者如果使用 Spring AMQP 2.0 或更高版本并且您正在使用它的工厂,则转换为 DirectMessageListenerContainer
)。
设置附加属性(如afterReceiveMessagePostProcessors
);然后 start()
容器。
注意:在我们增强容器以允许 return null
的 MPP 之前,一种可能的替代方法是从 MPP 中抛出 AmqpRejectAndDontRequeueException
。但是,如果您配置了 DLQ,这可能不是您想要的。
当消息重复时,从 DuplicateChecking MPP 的 postProcessMessage() 中抛出从 ImmediateAcknowledgeAmqpException 扩展的异常也不会将消息传递给 rabbit Listener。