spring-amqp看兔消息最早的入口点是什么?

What's the earliest point of entry to read a rabbit message in spring-amqp?

我将 thread-local 兔子消息数据存储在 MDC 中。我想为传入的兔子消息清除旧的并添加新的上下文数据,例如从 headers 读取某些值或将兔子消息有效负载读取为 byte[]。不幸的是,我经常看到在消息到达我的 @RabbitHandler 注释方法之前发生异常。是否有一个更早的 entry-point 我可以连接到它来建立这个上下文?我不知道在反序列化发生之前会发生什么,但理想情况下我希望在尝试反序列化之前访问该消息。也许某处有一个 onMessageReceived(byte[] message, Map headers) 方法挂钩。调用堆栈越早越好。

@RabbitHandlerAbstractRabbitListenerContainerFactory 填充,可以随自定义 MessageConverter 一起提供:https://docs.spring.io/spring-amqp/docs/2.0.1.RELEASE/reference/html/_reference.html#message-converters。它的 fromMessage() 是从 MessagingMessageListenerAdapter.toMessagingMessage() 调用的。这是在 MessagingMessageListenerAdapter.onMessage() 中完成的。那确实是您可以上钩的非常早的地方。你真的还有一个原始的 org.springframework.amqp.core.Message object 没有任何转换并且具有所有可用的 headers 和属性。

嗯,你也可以注射:

/**
 * @param afterReceivePostProcessors the post processors.
 * @see AbstractMessageListenerContainer#setAfterReceivePostProcessors(MessagePostProcessor...)
 */
public void setAfterReceivePostProcessors(MessagePostProcessor... afterReceivePostProcessors) {

出于与您请求类似的原因。

一个解决方案是使用 SimpleMessageListenerContainer 而不是 @RabbitHandler 注释,并使用自定义消息侦听器适配器。

示例:

@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                                         MessageListenerAdapter listenerAdapter) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(queueName);
    container.setMessageListener(listenerAdapter); // custom listener
    container.setMessageConverter(null); // disable default conversion
    return container;
}

@Bean
MessageListenerAdapter listenerAdapter() {
    RawMessageDelegate delegate = new RawMessageDelegate();
    return new MessageListenerAdapter(delegate);
}

public class RawMessageDelegate {

    void handleMessage(Message message) {
        byte[] body = message.getBody();
        MessageProperties properties = message.getMessageProperties();
        Map<String, Object> headers = properties.getHeaders();
        // handle raw data
    }

}