我如何在传入消息到达用@RabbitListener 注释的方法之前拦截它们?
How can i intercept incomig messages before they reach methods annotated with @RabbitListener?
我首先为外发消息设置了一个拦截器,该拦截器工作正常,但是
当我尝试拦截消费者中的传入消息时,postProcessMessage 方法
被跳过并且消息到达用@RabbitListener注释的方法,波纹管是我整个过程的代码,我省略了不重要的代码。
制作人
RabbitMQProducerInterceptor
@Component
@Slf4j
public class RabbitMQProducerInterceptor implements MessagePostProcessor {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
log.info("Getting the current HttpServletRequest");
HttpServletRequest req = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes())
.getRequest();
log.info("Extracting the X-REQUEST-ID from the header of the HttpServletRequest");
String XRequestId = req.getHeader(ShareableConstants.X_REQUEST_ID_HEADER);
log.info("Adding X-REQUEST-ID {} to the RabbitMQ Producer Header", XRequestId);
message.getMessageProperties().getHeaders().put(ShareableConstants.X_REQUEST_ID_HEADER, XRequestId);
return message;
}
}
RabbitMQProducerConfig
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setReplyTimeout(60000);
MessagePostProcessor[] processors = {new RabbitMQProducerInterceptor()};
rabbitTemplate.addBeforePublishPostProcessors(processors);
return rabbitTemplate;
}
正在向消费者发送消息
用户制作人
public UserRegistrationResponseDTO register(UserRegistrationDTO userRegistrationDTO) {
log.info("Sending user registration request {}", userRegistrationDTO);
UserRegistrationDTO response = (UserRegistrationDTO) rabbitTemplate
.convertSendAndReceive(ShareableConstants.EXCHANGE_NAME,
ShareableConstants.CREATE_USER_ROUTING_KEY,
userRegistrationDTO);
return UserRegistrationResponseDTO.builder()
.username(response.getUsername())
.id(response.getId())
.createdAt(response.getCreatedAt()).build();
}
消费者
RabbitMQConsumerConfig
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
MessagePostProcessor[] processors = {new RabbitMQConsumerInterceptor()};
rabbitTemplate.setAfterReceivePostProcessors(processors);
return rabbitTemplate;
}
RabbitMQConsumerInterceptor
@Component
public class RabbitMQConsumerInterceptor implements MessagePostProcessor {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
String XRequestId = message.getMessageProperties().getHeader(ShareableConstants.X_REQUEST_ID_HEADER);
MDC.put(ShareableConstants.X_REQUEST_ID_HEADER, XRequestId);
return message;
}
}
用户消费者
@RabbitListener(bindings =
@QueueBinding(exchange = @Exchange(ShareableConstants.EXCHANGE_NAME),
key = ShareableConstants.CREATE_USER_ROUTING_KEY,
value = @Queue(ShareableConstants.USER_REGISTRATION_QUEUE_NAME)))
public UserRegistrationDTO receiveUser(UserRegistrationDTO userRegistrationDTO) {
log.info("receiving user {} to register ", userRegistrationDTO);
User user = Optional.of(userRegistrationDTO).map(User::new).get();
User createdUser = userService.register(user);
UserRegistrationDTO registrationDTO = UserRegistrationDTO.builder()
.id(createdUser.getId())
.username(createdUser.getUsername())
.createdAt(createdUser.getCreationDate())
.build();
return registrationDTO;
}
这是代码,没有抛出异常,唯一的问题是拦截器被跳过
RabbitTemplate
未用于接收 @RabbitListener
的消息;消息由侦听器容器接收;您必须在侦听器容器工厂上设置 afterReceivePostProcessors
。
如果您使用 Spring 引导,只需将 auto-configured SimpleRabbitListenerContainerFactory
作为参数添加到您的其他 @Bean
之一,并在其上设置 MPP。
我首先为外发消息设置了一个拦截器,该拦截器工作正常,但是 当我尝试拦截消费者中的传入消息时,postProcessMessage 方法 被跳过并且消息到达用@RabbitListener注释的方法,波纹管是我整个过程的代码,我省略了不重要的代码。
制作人
RabbitMQProducerInterceptor
@Component
@Slf4j
public class RabbitMQProducerInterceptor implements MessagePostProcessor {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
log.info("Getting the current HttpServletRequest");
HttpServletRequest req = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes())
.getRequest();
log.info("Extracting the X-REQUEST-ID from the header of the HttpServletRequest");
String XRequestId = req.getHeader(ShareableConstants.X_REQUEST_ID_HEADER);
log.info("Adding X-REQUEST-ID {} to the RabbitMQ Producer Header", XRequestId);
message.getMessageProperties().getHeaders().put(ShareableConstants.X_REQUEST_ID_HEADER, XRequestId);
return message;
}
}
RabbitMQProducerConfig
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setReplyTimeout(60000);
MessagePostProcessor[] processors = {new RabbitMQProducerInterceptor()};
rabbitTemplate.addBeforePublishPostProcessors(processors);
return rabbitTemplate;
}
正在向消费者发送消息
用户制作人
public UserRegistrationResponseDTO register(UserRegistrationDTO userRegistrationDTO) {
log.info("Sending user registration request {}", userRegistrationDTO);
UserRegistrationDTO response = (UserRegistrationDTO) rabbitTemplate
.convertSendAndReceive(ShareableConstants.EXCHANGE_NAME,
ShareableConstants.CREATE_USER_ROUTING_KEY,
userRegistrationDTO);
return UserRegistrationResponseDTO.builder()
.username(response.getUsername())
.id(response.getId())
.createdAt(response.getCreatedAt()).build();
}
消费者
RabbitMQConsumerConfig
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
MessagePostProcessor[] processors = {new RabbitMQConsumerInterceptor()};
rabbitTemplate.setAfterReceivePostProcessors(processors);
return rabbitTemplate;
}
RabbitMQConsumerInterceptor
@Component
public class RabbitMQConsumerInterceptor implements MessagePostProcessor {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
String XRequestId = message.getMessageProperties().getHeader(ShareableConstants.X_REQUEST_ID_HEADER);
MDC.put(ShareableConstants.X_REQUEST_ID_HEADER, XRequestId);
return message;
}
}
用户消费者
@RabbitListener(bindings =
@QueueBinding(exchange = @Exchange(ShareableConstants.EXCHANGE_NAME),
key = ShareableConstants.CREATE_USER_ROUTING_KEY,
value = @Queue(ShareableConstants.USER_REGISTRATION_QUEUE_NAME)))
public UserRegistrationDTO receiveUser(UserRegistrationDTO userRegistrationDTO) {
log.info("receiving user {} to register ", userRegistrationDTO);
User user = Optional.of(userRegistrationDTO).map(User::new).get();
User createdUser = userService.register(user);
UserRegistrationDTO registrationDTO = UserRegistrationDTO.builder()
.id(createdUser.getId())
.username(createdUser.getUsername())
.createdAt(createdUser.getCreationDate())
.build();
return registrationDTO;
}
这是代码,没有抛出异常,唯一的问题是拦截器被跳过
RabbitTemplate
未用于接收 @RabbitListener
的消息;消息由侦听器容器接收;您必须在侦听器容器工厂上设置 afterReceivePostProcessors
。
如果您使用 Spring 引导,只需将 auto-configured SimpleRabbitListenerContainerFactory
作为参数添加到您的其他 @Bean
之一,并在其上设置 MPP。