为什么带有 Jackson2JsonMessageConverter 的 AmqpChannelFactoryBean 不存储类型?

Why does a AmqpChannelFactoryBean with Jackson2JsonMessageConverter not store type?

我正在尝试使用 Spring 与 RabbitMQ 的集成,使用 RabbitMQ 支持的 Spring 集成渠道。 (由于某种原因,这似乎几乎没有记录,这是新的吗?)。

为此,我似乎可以使用 AmqpChannelFactoryBean 来创建一个通道。 要设置消息转换,我使用 Jackson2JsonMessageConverter。

当我将 GenericMessage 与 POJO 负载一起使用时,它拒绝从 Java de-serialize 它,主要是因为它不知道类型。我本来希望类型会自动放在 header 上,但在 header 上只有 __TypeId__=org.springframework.messaging.support.GenericMessage.

在 Spring 启动我的配置 class 看起来像这样:

@Configuration
public class IntegrationConfiguration {

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpChannelFactoryBean myActivateOutChannel(CachingConnectionFactory connectionFactory,
        MessageConverter messageConverter) {

        AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
        factoryBean.setConnectionFactory(connectionFactory);
        factoryBean.setQueueName("myActivateOut");
        factoryBean.setPubSub(false);
        factoryBean.setAcknowledgeMode(AcknowledgeMode.AUTO);
        factoryBean.setDefaultDeliveryMode(MessageDeliveryMode.PERSISTENT);
        factoryBean.setMessageConverter(messageConverter);
        return factoryBean;
    }

    @Bean
    @ServiceActivator(inputChannel = "bsnkActivateOutChannel", autoStartup="true")
    public MessageHandler mqttOutbound() {

        return m -> System.out.println(m);
    }

}

发送是这样完成的:

private final MessageChannel myActivateOutChannel;

@Autowired
public MySender(MessageChannel myActivateOutChannel) {
    this.myActivateOutChannel = myActivateOutChannel;
}

@Override
public void run(ApplicationArguments args) throws Exception {
    MyPojo pojo = new MyPojo();
    Message<MyPojo> msg = new GenericMessage<>(pojo);

    myActivateOutChannel.send(msg);
}

如果我设置自己的 classmapper,事情就会按预期进行。但如果我设置这样的东西,我将不得不使用许多 MessageConverters。 例如

    converter.setClassMapper(new ClassMapper() {

        @Override
        public void fromClass(Class< ? > clazz, MessageProperties properties) {
        }

        @Override
        public Class< ? > toClass(MessageProperties properties) {
            return MyPojo.class;
        }

    });

我是不是用错了?我缺少一些配置吗?还有其他建议吗?

谢谢!! :)

注意:仔细观察,我猜 'Spring integration' 方法是在每一侧添加一个 Spring 集成 JSON 变压器,这意味着还要添加两个额外的每个 RabbitMQ 的直接通道 queue? 这对我来说感觉不对,因为那时我有三倍的渠道(6!对于 in/out),但也许这就是应该如何使用框架?将所有简单步骤与直接渠道结合起来? (在那种情况下,我是否保留 RabbitMQ 通道提供的持久性?或者如果我想要的话,我是否需要一些事务机制?或者它是直接通道工作方式所固有的吗?)

我还注意到现在有 Spring-integration MessageConverter 和 Spring-amqp MessageConverter。后者是我用过的。另一个会按照我想要的方式工作吗?快速浏览一下代码表明它没有在消息 header?

中存储 object 类型

在版本 4.3 之前,支持 amqp 的通道仅支持可序列化的负载;解决方法是改用通道适配器(支持映射)。

INT-3975 引入了一个新的 属性 extractPayload,它导致消息头映射到 rabbitmq 头,消息体只是有效负载而不是序列化的 GenericMessage.

extractPayload 设置为 true 应该可以解决您的问题。