我如何在传入消息到达用@RabbitListener 注释的方法之前拦截它们?

How can i intercept incomig messages before they reach methods annotated with @RabbitListener?

我首先为外发消息设置了一个拦截器,该拦截器工作正常,但是 当我尝试拦截消费者中的传入消息时,postProcessMessage 方法 被跳过并且消息到达用@RabbitListener注释的方法,波纹管是我整个过程的代码,我省略了不重要的代码。

制作人

RabbitMQProducerInterceptor

@Component
@Slf4j
public class RabbitMQProducerInterceptor implements MessagePostProcessor {

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {

        log.info("Getting the current HttpServletRequest");
        HttpServletRequest req = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes())
                .getRequest();

        log.info("Extracting the X-REQUEST-ID from the header of the HttpServletRequest");
        String XRequestId = req.getHeader(ShareableConstants.X_REQUEST_ID_HEADER);

        log.info("Adding X-REQUEST-ID {} to the RabbitMQ Producer Header", XRequestId);
        message.getMessageProperties().getHeaders().put(ShareableConstants.X_REQUEST_ID_HEADER, XRequestId);

        return message;
    }
}

RabbitMQProducerConfig

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setReplyTimeout(60000);
        MessagePostProcessor[] processors = {new RabbitMQProducerInterceptor()};
        rabbitTemplate.addBeforePublishPostProcessors(processors);
        return rabbitTemplate;
    }

正在向消费者发送消息

用户制作人

    public UserRegistrationResponseDTO register(UserRegistrationDTO userRegistrationDTO) {
        log.info("Sending user registration request {}", userRegistrationDTO);

        UserRegistrationDTO response = (UserRegistrationDTO) rabbitTemplate
                .convertSendAndReceive(ShareableConstants.EXCHANGE_NAME,
                        ShareableConstants.CREATE_USER_ROUTING_KEY,
                        userRegistrationDTO);

        return UserRegistrationResponseDTO.builder()
                .username(response.getUsername())
                .id(response.getId())
                .createdAt(response.getCreatedAt()).build();
    }

消费者

RabbitMQConsumerConfig

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        MessagePostProcessor[] processors = {new RabbitMQConsumerInterceptor()};
        rabbitTemplate.setAfterReceivePostProcessors(processors);
        return rabbitTemplate;
    }

RabbitMQConsumerInterceptor


@Component
public class RabbitMQConsumerInterceptor implements MessagePostProcessor {

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {

        String XRequestId = message.getMessageProperties().getHeader(ShareableConstants.X_REQUEST_ID_HEADER);

        MDC.put(ShareableConstants.X_REQUEST_ID_HEADER, XRequestId);

        return message;
    }
}

用户消费者

@RabbitListener(bindings =
    @QueueBinding(exchange = @Exchange(ShareableConstants.EXCHANGE_NAME),
            key = ShareableConstants.CREATE_USER_ROUTING_KEY,
            value = @Queue(ShareableConstants.USER_REGISTRATION_QUEUE_NAME)))
    public UserRegistrationDTO receiveUser(UserRegistrationDTO userRegistrationDTO) {
        log.info("receiving user {} to register ", userRegistrationDTO);
        User user = Optional.of(userRegistrationDTO).map(User::new).get();
        User createdUser = userService.register(user);

        UserRegistrationDTO registrationDTO = UserRegistrationDTO.builder()
                .id(createdUser.getId())
                .username(createdUser.getUsername())
                .createdAt(createdUser.getCreationDate())
                .build();

        return registrationDTO;
    }

这是代码,没有抛出异常,唯一的问题是拦截器被跳过

RabbitTemplate 未用于接收 @RabbitListener 的消息;消息由侦听器容器接收;您必须在侦听器容器工厂上设置 afterReceivePostProcessors

如果您使用 Spring 引导,只需将 auto-configured SimpleRabbitListenerContainerFactory 作为参数添加到您的其他 @Bean 之一,并在其上设置 MPP。