我们如何使用@RabbitListener 连接到 before/After 消息处理

How do we hook into before/After message processing using @RabbitListener

问题: 我正在从 MessageListener 接口实现迁移到 @RabbitListener。我有这样的逻辑,我在一个 MessageListener 上做 "pre" 和 "post" 消息处理,它被几个 类

继承

示例:

public AbstractMessageListener implements MessageListener {

     @Override
     public void onMessage(Message message) {

          //do some pre message processing

          process(Message message);

          // do some post message processing
     }

     protected abstract void process(Message message);

}

问题:有没有一种方法可以使用@RabbitListener注解实现类似的东西我可以继承pre/post消息处理逻辑而不必重新实现或调用每个子 @RabbitListener 注释中的 pre/post 消息处理,同时为子 @RabbitListener 维护可自定义的方法签名?还是这太贪心了?

示例期望结果:

public class SomeRabbitListenerClass {

    @RabbitListener( id = "listener.mypojo",queues = "${rabbitmq.some.queue}")
   public void listen(@Valid MyPojo myPojo) {
      //...
   }
}

public class SomeOtherRabbitListenerClass {

    @RabbitListener(id = "listener.orders",queues ="${rabbitmq.some.other.queue}")
   public void listen(Order order, @Header("order_type") String orderType) {
      //...
   }
}

这两个@RabbitListener 使用相同的继承 pre/post 消息处理

我看到@RabbitListener 注释中有一个 'containerFactory' 参数,但我已经在配置中声明了一个...而且我真的很确定如何通过自定义实现我想要的继承容器工厂。


更新答案:这就是我最终做的。

建议定义:

import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.springframework.amqp.core.Message;

/**
 * AOP Around advice wrapper. Every time a message comes in we can do 
 * pre/post processing by using this advice by implementing the before/after methods.
 * @author sjacobs
 *
 */
public class RabbitListenerAroundAdvice implements MethodInterceptor {

    /**
     * place the "AroundAdvice" around each new message being processed.
     */
    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {

        Message message = (Message) invocation.getArguments()[1];

        before(message)
        Object result = invocation.proceed();
        after(message);

        return  result;
    }

声明 beans: 在你的 rabbitmq 配置中将建议声明为 Spring bean 并将其传递给 rabbitListenerContainerFactory#setAdviceChain(...)

//...

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory( cachingConnectionFactory() );
        factory.setTaskExecutor(threadPoolTaskExecutor());
        factory.setMessageConverter(jackson2JsonMessageConverter());   

        factory.setAdviceChain(rabbitListenerAroundAdvice());

        return factory;
    }

    @Bean
    public RabbitListenerAroundAdvice rabbitListenerAroundAdvice() {
        return new RabbitListenerAroundAdvice();
    }

// ...

更正

您可以使用 SimpleRabbitListenerContainerFactory 中的建议链将环绕建议应用于为 @RabbitListener 创建的侦听器;两个参数是 ChannelMessage.

如果只需要在调用监听器前进行操作,可以在容器afterReceivePostProcessors.

中添加MessagePostProcessor(s)

这里不可能继承,因为 POJO 方法上的注释处理和 MessageListener 实现是完全不同的故事。

使用 MessageListener 您可以完全控制目标行为和 container

使用注解,您只需要处理 POJO,framework-free 代码。特定的 MessageListener 是在后台创建的。而那个完全基于注释的方法。

我想我们可以使用 Spring AOP 框架来实现您的要求。

查看最近的问题及其对此事的回答:How to write an integration test for @RabbitListener annotation?