为什么带有 Jackson2JsonMessageConverter 的 AmqpChannelFactoryBean 不存储类型?
Why does a AmqpChannelFactoryBean with Jackson2JsonMessageConverter not store type?
我正在尝试使用 Spring 与 RabbitMQ 的集成,使用 RabbitMQ 支持的 Spring 集成渠道。 (由于某种原因,这似乎几乎没有记录,这是新的吗?)。
为此,我似乎可以使用 AmqpChannelFactoryBean 来创建一个通道。
要设置消息转换,我使用 Jackson2JsonMessageConverter。
当我将 GenericMessage 与 POJO 负载一起使用时,它拒绝从 Java de-serialize 它,主要是因为它不知道类型。我本来希望类型会自动放在 header 上,但在 header 上只有 __TypeId__=org.springframework.messaging.support.GenericMessage
.
在 Spring 启动我的配置 class 看起来像这样:
@Configuration
public class IntegrationConfiguration {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public AmqpChannelFactoryBean myActivateOutChannel(CachingConnectionFactory connectionFactory,
MessageConverter messageConverter) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("myActivateOut");
factoryBean.setPubSub(false);
factoryBean.setAcknowledgeMode(AcknowledgeMode.AUTO);
factoryBean.setDefaultDeliveryMode(MessageDeliveryMode.PERSISTENT);
factoryBean.setMessageConverter(messageConverter);
return factoryBean;
}
@Bean
@ServiceActivator(inputChannel = "bsnkActivateOutChannel", autoStartup="true")
public MessageHandler mqttOutbound() {
return m -> System.out.println(m);
}
}
发送是这样完成的:
private final MessageChannel myActivateOutChannel;
@Autowired
public MySender(MessageChannel myActivateOutChannel) {
this.myActivateOutChannel = myActivateOutChannel;
}
@Override
public void run(ApplicationArguments args) throws Exception {
MyPojo pojo = new MyPojo();
Message<MyPojo> msg = new GenericMessage<>(pojo);
myActivateOutChannel.send(msg);
}
如果我设置自己的 classmapper,事情就会按预期进行。但如果我设置这样的东西,我将不得不使用许多 MessageConverters。
例如
converter.setClassMapper(new ClassMapper() {
@Override
public void fromClass(Class< ? > clazz, MessageProperties properties) {
}
@Override
public Class< ? > toClass(MessageProperties properties) {
return MyPojo.class;
}
});
我是不是用错了?我缺少一些配置吗?还有其他建议吗?
谢谢!! :)
注意:仔细观察,我猜 'Spring integration' 方法是在每一侧添加一个 Spring 集成 JSON 变压器,这意味着还要添加两个额外的每个 RabbitMQ 的直接通道 queue?
这对我来说感觉不对,因为那时我有三倍的渠道(6!对于 in/out),但也许这就是应该如何使用框架?将所有简单步骤与直接渠道结合起来? (在那种情况下,我是否保留 RabbitMQ 通道提供的持久性?或者如果我想要的话,我是否需要一些事务机制?或者它是直接通道工作方式所固有的吗?)
我还注意到现在有 Spring-integration MessageConverter 和 Spring-amqp MessageConverter。后者是我用过的。另一个会按照我想要的方式工作吗?快速浏览一下代码表明它没有在消息 header?
中存储 object 类型
在版本 4.3 之前,支持 amqp 的通道仅支持可序列化的负载;解决方法是改用通道适配器(支持映射)。
INT-3975 引入了一个新的 属性 extractPayload
,它导致消息头映射到 rabbitmq 头,消息体只是有效负载而不是序列化的 GenericMessage
.
将 extractPayload
设置为 true 应该可以解决您的问题。
我正在尝试使用 Spring 与 RabbitMQ 的集成,使用 RabbitMQ 支持的 Spring 集成渠道。 (由于某种原因,这似乎几乎没有记录,这是新的吗?)。
为此,我似乎可以使用 AmqpChannelFactoryBean 来创建一个通道。 要设置消息转换,我使用 Jackson2JsonMessageConverter。
当我将 GenericMessage 与 POJO 负载一起使用时,它拒绝从 Java de-serialize 它,主要是因为它不知道类型。我本来希望类型会自动放在 header 上,但在 header 上只有 __TypeId__=org.springframework.messaging.support.GenericMessage
.
在 Spring 启动我的配置 class 看起来像这样:
@Configuration
public class IntegrationConfiguration {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public AmqpChannelFactoryBean myActivateOutChannel(CachingConnectionFactory connectionFactory,
MessageConverter messageConverter) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("myActivateOut");
factoryBean.setPubSub(false);
factoryBean.setAcknowledgeMode(AcknowledgeMode.AUTO);
factoryBean.setDefaultDeliveryMode(MessageDeliveryMode.PERSISTENT);
factoryBean.setMessageConverter(messageConverter);
return factoryBean;
}
@Bean
@ServiceActivator(inputChannel = "bsnkActivateOutChannel", autoStartup="true")
public MessageHandler mqttOutbound() {
return m -> System.out.println(m);
}
}
发送是这样完成的:
private final MessageChannel myActivateOutChannel;
@Autowired
public MySender(MessageChannel myActivateOutChannel) {
this.myActivateOutChannel = myActivateOutChannel;
}
@Override
public void run(ApplicationArguments args) throws Exception {
MyPojo pojo = new MyPojo();
Message<MyPojo> msg = new GenericMessage<>(pojo);
myActivateOutChannel.send(msg);
}
如果我设置自己的 classmapper,事情就会按预期进行。但如果我设置这样的东西,我将不得不使用许多 MessageConverters。 例如
converter.setClassMapper(new ClassMapper() {
@Override
public void fromClass(Class< ? > clazz, MessageProperties properties) {
}
@Override
public Class< ? > toClass(MessageProperties properties) {
return MyPojo.class;
}
});
我是不是用错了?我缺少一些配置吗?还有其他建议吗?
谢谢!! :)
注意:仔细观察,我猜 'Spring integration' 方法是在每一侧添加一个 Spring 集成 JSON 变压器,这意味着还要添加两个额外的每个 RabbitMQ 的直接通道 queue? 这对我来说感觉不对,因为那时我有三倍的渠道(6!对于 in/out),但也许这就是应该如何使用框架?将所有简单步骤与直接渠道结合起来? (在那种情况下,我是否保留 RabbitMQ 通道提供的持久性?或者如果我想要的话,我是否需要一些事务机制?或者它是直接通道工作方式所固有的吗?)
我还注意到现在有 Spring-integration MessageConverter 和 Spring-amqp MessageConverter。后者是我用过的。另一个会按照我想要的方式工作吗?快速浏览一下代码表明它没有在消息 header?
中存储 object 类型在版本 4.3 之前,支持 amqp 的通道仅支持可序列化的负载;解决方法是改用通道适配器(支持映射)。
INT-3975 引入了一个新的 属性 extractPayload
,它导致消息头映射到 rabbitmq 头,消息体只是有效负载而不是序列化的 GenericMessage
.
将 extractPayload
设置为 true 应该可以解决您的问题。