Spring AMQP:将消息处理器添加到自动配置的“RabbitTemplate”
Spring AMQP: Adding Message Processors To Auto Configured `RabbitTemplate`
我正在尝试调用 RabbitTemplate#addBeforePublishPostProcessors
和 RabbitTemplate#addAfterReceivePostProcessors
,但不会过多干扰 Spring 的自动配置。
我正在尝试这样做,但我的 MessagePostProcessor
没有触发(我没有看到正在发布的消息中的 'test_header')。
@EventListener
void test(ApplicationPreparedEvent event) {
ConfigurableApplicationContext applicationContext = event.getApplicationContext();
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
rabbitTemplate.addBeforePublishPostProcessors(new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setHeader("test_header", "test_header_value");
return message;
}
});
}
为此挂钩的正确位置是什么?
我也试过听 ApplicationStartedEvent
。
更新:
根据 Gary 的建议在我的 @Configuration
class 中添加了这个 bean:
@Bean
public static BeanPostProcessor rabbitTemplatePostProcessor() {
return new BeanPostProcessor() {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if(bean instanceof RabbitTemplate) {
RabbitTemplate rabbitTemplate = (RabbitTemplate) bean;
rabbitTemplate.addBeforePublishPostProcessors(m -> {
m.getMessageProperties()
.setHeader(MESSAGE_PUBLISHED_TIME, currentTimeMillis());
return m;
});
rabbitTemplate.addAfterReceivePostProcessors(m -> {
m.getMessageProperties().setHeader(MESSAGE_RECEIVED_TIME, currentTimeMillis());
return m;
});
}
return bean;
}
};
}
对于正在使用 @RabbitListener
和 @SendTo
寻找如何执行此操作的答案的任何人,请参阅 Gary 对其答案的编辑。
使用 BeanPostProcessor
.
@SpringBootApplication
public class So56155062Application {
public static void main(String[] args) {
SpringApplication.run(So56155062Application.class, args);
}
@Bean
public static BeanPostProcessor bpp() {
return new BeanPostProcessor() {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof RabbitTemplate) {
((RabbitTemplate) bean).setBeforePublishPostProcessors(m -> {
m.getMessageProperties().setHeader("foo", "baz");
return m;
});
}
return bean;
}
};
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> template.convertAndSend("foo", "bar");
}
@RabbitListener(queues = "foo")
public void listen(String in, @Header("foo") String header) {
System.out.println(in + header);
}
}
注意 static
修饰符
编辑
模板不用于回复; post 处理器转而进入容器工厂。
@SpringBootApplication
public class So56155062Application {
public static void main(String[] args) {
SpringApplication.run(So56155062Application.class, args);
}
@Bean
public static BeanPostProcessor bpp() {
return new BeanPostProcessor() {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof RabbitTemplate) {
((RabbitTemplate) bean).setBeforePublishPostProcessors(m -> {
m.getMessageProperties().setHeader("foo", "baz");
m.getMessageProperties().setReplyTo("bar");
System.out.println("Adding header to outgoing message with payload: " + new String(m.getBody()));
return m;
});
}
else if (bean instanceof AbstractRabbitListenerContainerFactory) {
((AbstractRabbitListenerContainerFactory<?>) bean).setAfterReceivePostProcessors(m -> {
m.getMessageProperties().setHeader("qux", "fiz");
System.out.println("Adding header to incoming message with payload: " + new String(m.getBody()));
return m;
});
((AbstractRabbitListenerContainerFactory<?>) bean).setBeforeSendReplyPostProcessors(m -> {
m.getMessageProperties().setHeader("foo", "baz");
m.getMessageProperties().setReplyTo("bar");
System.out.println(
"Adding header to outgoing reply message with payload: " + new String(m.getBody()));
return m;
});
}
return bean;
}
};
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> template.convertAndSend("foo", "bar");
}
@RabbitListener(queues = "foo")
@SendTo
public String listen1(String in, @Header("foo") String header) {
System.out.println(in + header);
return in.toUpperCase();
}
@RabbitListener(queues = "bar")
public void listen2(String in) {
System.out.println(in);
}
}
和
Adding header to outgoing message with payload: bar
Adding header to incoming message with payload: bar
barbaz
Adding header to outgoing reply message with payload: BAR
Adding header to incoming message with payload: BAR
BAR
我正在尝试调用 RabbitTemplate#addBeforePublishPostProcessors
和 RabbitTemplate#addAfterReceivePostProcessors
,但不会过多干扰 Spring 的自动配置。
我正在尝试这样做,但我的 MessagePostProcessor
没有触发(我没有看到正在发布的消息中的 'test_header')。
@EventListener
void test(ApplicationPreparedEvent event) {
ConfigurableApplicationContext applicationContext = event.getApplicationContext();
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
rabbitTemplate.addBeforePublishPostProcessors(new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setHeader("test_header", "test_header_value");
return message;
}
});
}
为此挂钩的正确位置是什么?
我也试过听 ApplicationStartedEvent
。
更新:
根据 Gary 的建议在我的 @Configuration
class 中添加了这个 bean:
@Bean
public static BeanPostProcessor rabbitTemplatePostProcessor() {
return new BeanPostProcessor() {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if(bean instanceof RabbitTemplate) {
RabbitTemplate rabbitTemplate = (RabbitTemplate) bean;
rabbitTemplate.addBeforePublishPostProcessors(m -> {
m.getMessageProperties()
.setHeader(MESSAGE_PUBLISHED_TIME, currentTimeMillis());
return m;
});
rabbitTemplate.addAfterReceivePostProcessors(m -> {
m.getMessageProperties().setHeader(MESSAGE_RECEIVED_TIME, currentTimeMillis());
return m;
});
}
return bean;
}
};
}
对于正在使用 @RabbitListener
和 @SendTo
寻找如何执行此操作的答案的任何人,请参阅 Gary 对其答案的编辑。
使用 BeanPostProcessor
.
@SpringBootApplication
public class So56155062Application {
public static void main(String[] args) {
SpringApplication.run(So56155062Application.class, args);
}
@Bean
public static BeanPostProcessor bpp() {
return new BeanPostProcessor() {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof RabbitTemplate) {
((RabbitTemplate) bean).setBeforePublishPostProcessors(m -> {
m.getMessageProperties().setHeader("foo", "baz");
return m;
});
}
return bean;
}
};
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> template.convertAndSend("foo", "bar");
}
@RabbitListener(queues = "foo")
public void listen(String in, @Header("foo") String header) {
System.out.println(in + header);
}
}
注意 static
修饰符
编辑
模板不用于回复; post 处理器转而进入容器工厂。
@SpringBootApplication
public class So56155062Application {
public static void main(String[] args) {
SpringApplication.run(So56155062Application.class, args);
}
@Bean
public static BeanPostProcessor bpp() {
return new BeanPostProcessor() {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof RabbitTemplate) {
((RabbitTemplate) bean).setBeforePublishPostProcessors(m -> {
m.getMessageProperties().setHeader("foo", "baz");
m.getMessageProperties().setReplyTo("bar");
System.out.println("Adding header to outgoing message with payload: " + new String(m.getBody()));
return m;
});
}
else if (bean instanceof AbstractRabbitListenerContainerFactory) {
((AbstractRabbitListenerContainerFactory<?>) bean).setAfterReceivePostProcessors(m -> {
m.getMessageProperties().setHeader("qux", "fiz");
System.out.println("Adding header to incoming message with payload: " + new String(m.getBody()));
return m;
});
((AbstractRabbitListenerContainerFactory<?>) bean).setBeforeSendReplyPostProcessors(m -> {
m.getMessageProperties().setHeader("foo", "baz");
m.getMessageProperties().setReplyTo("bar");
System.out.println(
"Adding header to outgoing reply message with payload: " + new String(m.getBody()));
return m;
});
}
return bean;
}
};
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> template.convertAndSend("foo", "bar");
}
@RabbitListener(queues = "foo")
@SendTo
public String listen1(String in, @Header("foo") String header) {
System.out.println(in + header);
return in.toUpperCase();
}
@RabbitListener(queues = "bar")
public void listen2(String in) {
System.out.println(in);
}
}
和
Adding header to outgoing message with payload: bar
Adding header to incoming message with payload: bar
barbaz
Adding header to outgoing reply message with payload: BAR
Adding header to incoming message with payload: BAR
BAR