如何创建@RabbitListener 是幂等的

How to create @RabbitListener to be idempotent

我们的配置是:1...n 个具有共享数据库的消息接收器。 消息应该只被处理一次。

   @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "message-queue", durable = "true"),
            exchange = @Exchange(value = TOPIC_EXCHANGE, type = "topic", durable = "true"),
            key = MESSAGE_QUEUE1_RK)
    )
    public void receiveMessage(CustomMessage message) throws InterruptedException {
        System.out.println("I have been received = " + message);
    }

我们要保证消息将被处理一次,我们有一个消息存储库,其中包含已处理消息的 ID。 是否可以在 receiveMessage 之前挂钩此检查? 我们尝试查看带有 rabbitTemplate 的 MessagePostProcessor,但似乎没有用。

关于如何做到这一点有什么建议吗? 我们尝试使用 MethodInterceptor 并且这有效,但是非常难看。 谢谢


找到解决方案 - 感谢 Gary 我创建了一个实现 SmartLifecycleMessagePostProcessorInjector 在启动时,我检查每个容器,如果它是 AbstractMessageListenerContainer 添加一个客户 MessagePostProccesser 和一个自定义 ErrorHandler 寻找特定类型的异常并丢弃它们(其他转发到 defaultErrorHandler) 由于我们使用的是 DLQ,我发现抛出异常或设置为 null 不会真正起作用。

我将在 MPP 之后发出一个 pull request 来忽略空消息。

有趣; SimpleMessageListenerContainer 确实有一个 属性 afterReceivePostProcessors 目前无法通过注释使用的侦听器容器工厂获得,但可以稍后注入 ) .

但是,那些 post 处理器无济于事,因为我们仍然调用侦听器。

请随时为两件事打开一个JIRA Improvement Issue

  1. 在侦听器容器工厂中公开 afterReceivePostProcessors
  2. 如果 post 处理器 return 为 null,则跳过调用侦听器方法。

(更正,属性确实是工厂暴露的)

编辑

它是如何工作的...

上下文初始化期间...

  1. 对于 bean post 处理器检测到的每个注解,都会在 RabbitListenerEndpointRegistry
  2. 中创建并注册容器
  3. 在上下文初始化接近尾声时,注册表被 start()ed 并启动为 autoStartup(默认)配置的所有容器。

要在容器启动之前对其进行进一步配置(例如,对于容器工厂当前未公开的属性),请将 autoStartup 设置为 false

然后您可以从注册表中获取容器(作为集合或通过 id)。只需 @Autowire 您应用中的注册表。

将容器转换为 SimpleMessageListenerContainer(或者如果使用 Spring AMQP 2.0 或更高版本并且您正在使用它的工厂,则转换为 DirectMessageListenerContainer)。

设置附加属性(如afterReceiveMessagePostProcessors);然后 start() 容器。

注意:在我们增强容器以允许 return null 的 MPP 之前,一种可能的替代方法是从 MPP 中抛出 AmqpRejectAndDontRequeueException。但是,如果您配置了 DLQ,这可能不是您想要的。

当消息重复时,从 DuplicateChecking MPP 的 postProcessMessage() 中抛出从 ImmediateAcknowledgeAmqpException 扩展的异常也不会将消息传递给 rabbit Listener。