Spring 集成 - AMQP 支持的消息通道和消息转换

Spring integration - AMQP backed message channels and message conversion

我正在尝试在我的 Spring 集成应用程序中使用 AMQP 支持的消息通道,但我认为我从根本上误解了一些东西,特别是围绕 Message<?> 接口以及 GenericMessage<?> 被写入和读取,一个 RabbitMQ 队列。

鉴于我有一个包含以下域模型对象的 Spring 集成应用程序:

@Immutable
class Foo {
  String name
  long quantity
}

我声明了一个名为 fooChannel 的 AMQP 支持的消息通道,如下所示:

@Bean
public AmqpChannelFactoryBean deliveryPlacementChannel(CachingConnectionFactory connectionFactory) {
  AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true)
  factoryBean.setConnectionFactory(connectionFactory)
  factoryBean.setQueueName("foo")
  factoryBean.beanName = 'fooChannel'
  factoryBean.setPubSub(false)
  factoryBean
}

当我最初尝试向我的 fooChannel 发送消息时,我收到了 java.io.NotSerializableException。我意识到这是因为我的 AMQP 支持的 fooChannel 使用的 RabbitTemplate 使用的 org.springframework.amqp.support.converter.SimpleMessageConverter 只能与字符串、可序列化实例或字节数组一起使用,我的 Foo 模型是那些东西的 none。

因此,我认为我应该使用 org.springframework.amqp.support.converter.Jackson2JsonMessageConverter 来确保我的 Foo 模型被​​正确转换 to/from 和 AMQP 消息。但是,添加到支持我的 fooChannel 的 RabbitMQ 队列的消息类型似乎是 org.springframework.messaging.support.GenericMessage 类型。这意味着当我的 AMQP 支持 fooChannel 尝试使用来自 RabbitMQ 队列的消息时,它会收到以下异常:

Caused by: com.fasterxml.jackson.databind.JsonMappingException: No suitable constructor found for type [simple type, class org.springframework.messaging.support.GenericMessage]: can not instantiate from JSON object (missing default constructor or creator, or perhaps need to add/enable type information?)

通过查看 GenericMessage class,我发现它被设计为不可变的,这清楚地解释了为什么 Jackson2JsonMessageConverter 无法从 JSON 转换为GenericMessage 类型。但是,我不确定我应该做什么才能让我的 fooChannel 得到 AMQP 的支持,并使包含我的 Foo 模型的 Spring 集成消息的转换正常工作?

就我的应用程序流程而言,我有以下 Transformer 组件,它使用来自(非 AMQP 支持)barChannelBar 模型并放置 Foo模特上fooChannel如下:

@Transformer(inputChannel = 'barChannel', outputChannel = 'fooChannel')
public Foo transform(Bar bar) {
  //transform logic removed for brevity
  new Foo(name: 'Foo1', quantity: 1)
}

然后我有一个 ServiceActivator 组件,我希望从我的 fooChannel 中使用它,如下所示:

@ServiceActivator(inputChannel = 'fooChannel')
void consumeFoos(Foo foo){
  // Do something with foo
} 

我正在使用 spring-integration-core:4.2.5.RELEASEspring-integration-amqp:4.2.5.RELEASE

谁能告诉我 Spring 集成应用程序的配置哪里出了问题?

如果需要任何进一步的信息以更好地阐明我的疑问或问题,请告诉我。谢谢

是 - amqp 支持的通道目前仅限于 Java 个可序列化对象。

我们应该提供一个选项来将 Message<?> 映射到 Spring AMQP Message(就像通道适配器那样)而不是...

this.amqpTemplate.convertAndSend(this.getExchangeName(), this.getRoutingKey(), message);

...转换整个消息。

您可以使用一对通道适配器 (outbound/inbound) 而不是一个通道。

由于您使用的是 Java 配置,因此您可以将适配器对包装在新的 MessageChannel 实现中。

我开了一个JIRA Issue for this.