如何在 Spring AMQP 中使用拦截器

How to use interceptor with Spring AMQP

有没有办法在调用 template.convertAndSend 之后拦截消息,然后再将消息传送到 RabbitMQ。

还有什么方法可以在到达处理程序之前拦截消息?

我可以使用发布者的后处理器处理消息,但更喜欢使用拦截器。

public class TestPostProcessor implements MessagePostProcessor {

    @Autowired
    Tracer defaultTracer;

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        //.....
        //.... 
        return message;
    }
}

有什么建议吗?

MessagePostProcessor 是拦截器的一种形式。

有两种方法可以调用其中一种 - 使用以 MPP 作为参数的重载 convertAndSend() 方法之一,或者使用 setBeforePublishPostProcessors()RabbitTemplate 添加一个或多个方法.

您还可以使用 setAfterReceivePostProcessors() 拦截接收到的消息,该方法在接收到的消息从 receive() 方法返回之前调用。

侦听器容器在接收之后和通过其 setAfterReceivePostProcessors() 方法传递给侦听器之前也支持 MPP。

如果您想在 application.properties 文件中继续使用 spring 启动属性(来自 org.springframework.boot.autoconfigure.amqp.RabbitProperties),您可以提供自己的 RabbitListenerContainerFactory :

  @Bean
  public CustomRabbitListenerContainerFactory rabbitListenerContainerFactory(
      SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory, MyContextMessageProcessor messageProcessor) {
    CustomRabbitListenerContainerFactory factory = new CustomRabbitListenerContainerFactory(messageProcessor);
    configurer.configure(factory, connectionFactory);
    return factory;
  }

CustomRabbitListenerContainerFactory.java :

public class CustomRabbitListenerContainerFactory
  extends org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory {

  private MessagePostProcessor[] messagePostProcessor;

  public CustomRabbitListenerContainerFactory(MessagePostProcessor... messagePostProcessor) {
    super();
    this.messagePostProcessor = messagePostProcessor;
  }

  @Override
  protected void initializeContainer(SimpleMessageListenerContainer instance, RabbitListenerEndpoint endpoint) {
    super.initializeContainer(instance, endpoint);
    instance.addAfterReceivePostProcessors(messagePostProcessor);
  }
}