如何在 Spring AMQP 中使用拦截器
How to use interceptor with Spring AMQP
有没有办法在调用 template.convertAndSend 之后拦截消息,然后再将消息传送到 RabbitMQ。
还有什么方法可以在到达处理程序之前拦截消息?
我可以使用发布者的后处理器处理消息,但更喜欢使用拦截器。
public class TestPostProcessor implements MessagePostProcessor {
@Autowired
Tracer defaultTracer;
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//.....
//....
return message;
}
}
有什么建议吗?
MessagePostProcessor
是拦截器的一种形式。
有两种方法可以调用其中一种 - 使用以 MPP 作为参数的重载 convertAndSend()
方法之一,或者使用 setBeforePublishPostProcessors()
向 RabbitTemplate
添加一个或多个方法.
您还可以使用 setAfterReceivePostProcessors()
拦截接收到的消息,该方法在接收到的消息从 receive()
方法返回之前调用。
侦听器容器在接收之后和通过其 setAfterReceivePostProcessors()
方法传递给侦听器之前也支持 MPP。
如果您想在 application.properties 文件中继续使用 spring 启动属性(来自 org.springframework.boot.autoconfigure.amqp.RabbitProperties),您可以提供自己的 RabbitListenerContainerFactory :
@Bean
public CustomRabbitListenerContainerFactory rabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory, MyContextMessageProcessor messageProcessor) {
CustomRabbitListenerContainerFactory factory = new CustomRabbitListenerContainerFactory(messageProcessor);
configurer.configure(factory, connectionFactory);
return factory;
}
CustomRabbitListenerContainerFactory.java :
public class CustomRabbitListenerContainerFactory
extends org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory {
private MessagePostProcessor[] messagePostProcessor;
public CustomRabbitListenerContainerFactory(MessagePostProcessor... messagePostProcessor) {
super();
this.messagePostProcessor = messagePostProcessor;
}
@Override
protected void initializeContainer(SimpleMessageListenerContainer instance, RabbitListenerEndpoint endpoint) {
super.initializeContainer(instance, endpoint);
instance.addAfterReceivePostProcessors(messagePostProcessor);
}
}
有没有办法在调用 template.convertAndSend 之后拦截消息,然后再将消息传送到 RabbitMQ。
还有什么方法可以在到达处理程序之前拦截消息?
我可以使用发布者的后处理器处理消息,但更喜欢使用拦截器。
public class TestPostProcessor implements MessagePostProcessor {
@Autowired
Tracer defaultTracer;
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//.....
//....
return message;
}
}
有什么建议吗?
MessagePostProcessor
是拦截器的一种形式。
有两种方法可以调用其中一种 - 使用以 MPP 作为参数的重载 convertAndSend()
方法之一,或者使用 setBeforePublishPostProcessors()
向 RabbitTemplate
添加一个或多个方法.
您还可以使用 setAfterReceivePostProcessors()
拦截接收到的消息,该方法在接收到的消息从 receive()
方法返回之前调用。
侦听器容器在接收之后和通过其 setAfterReceivePostProcessors()
方法传递给侦听器之前也支持 MPP。
如果您想在 application.properties 文件中继续使用 spring 启动属性(来自 org.springframework.boot.autoconfigure.amqp.RabbitProperties),您可以提供自己的 RabbitListenerContainerFactory :
@Bean
public CustomRabbitListenerContainerFactory rabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory, MyContextMessageProcessor messageProcessor) {
CustomRabbitListenerContainerFactory factory = new CustomRabbitListenerContainerFactory(messageProcessor);
configurer.configure(factory, connectionFactory);
return factory;
}
CustomRabbitListenerContainerFactory.java :
public class CustomRabbitListenerContainerFactory
extends org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory {
private MessagePostProcessor[] messagePostProcessor;
public CustomRabbitListenerContainerFactory(MessagePostProcessor... messagePostProcessor) {
super();
this.messagePostProcessor = messagePostProcessor;
}
@Override
protected void initializeContainer(SimpleMessageListenerContainer instance, RabbitListenerEndpoint endpoint) {
super.initializeContainer(instance, endpoint);
instance.addAfterReceivePostProcessors(messagePostProcessor);
}
}