Spring 集成 - AMQP 支持的消息通道和消息转换
Spring integration - AMQP backed message channels and message conversion
我正在尝试在我的 Spring 集成应用程序中使用 AMQP 支持的消息通道,但我认为我从根本上误解了一些东西,特别是围绕 Message<?>
接口以及 GenericMessage<?>
被写入和读取,一个 RabbitMQ 队列。
鉴于我有一个包含以下域模型对象的 Spring 集成应用程序:
@Immutable
class Foo {
String name
long quantity
}
我声明了一个名为 fooChannel
的 AMQP 支持的消息通道,如下所示:
@Bean
public AmqpChannelFactoryBean deliveryPlacementChannel(CachingConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true)
factoryBean.setConnectionFactory(connectionFactory)
factoryBean.setQueueName("foo")
factoryBean.beanName = 'fooChannel'
factoryBean.setPubSub(false)
factoryBean
}
当我最初尝试向我的 fooChannel
发送消息时,我收到了 java.io.NotSerializableException
。我意识到这是因为我的 AMQP 支持的 fooChannel
使用的 RabbitTemplate
使用的 org.springframework.amqp.support.converter.SimpleMessageConverter
只能与字符串、可序列化实例或字节数组一起使用,我的 Foo
模型是那些东西的 none。
因此,我认为我应该使用 org.springframework.amqp.support.converter.Jackson2JsonMessageConverter
来确保我的 Foo
模型被正确转换 to/from 和 AMQP 消息。但是,添加到支持我的 fooChannel
的 RabbitMQ 队列的消息类型似乎是 org.springframework.messaging.support.GenericMessage
类型。这意味着当我的 AMQP 支持 fooChannel
尝试使用来自 RabbitMQ 队列的消息时,它会收到以下异常:
Caused by: com.fasterxml.jackson.databind.JsonMappingException: No suitable constructor found for type [simple type, class org.springframework.messaging.support.GenericMessage]: can not instantiate from JSON object (missing default constructor or creator, or perhaps need to add/enable type information?)
通过查看 GenericMessage
class,我发现它被设计为不可变的,这清楚地解释了为什么 Jackson2JsonMessageConverter
无法从 JSON 转换为GenericMessage
类型。但是,我不确定我应该做什么才能让我的 fooChannel
得到 AMQP 的支持,并使包含我的 Foo
模型的 Spring 集成消息的转换正常工作?
就我的应用程序流程而言,我有以下 Transformer
组件,它使用来自(非 AMQP 支持)barChannel
的 Bar
模型并放置 Foo
模特上fooChannel
如下:
@Transformer(inputChannel = 'barChannel', outputChannel = 'fooChannel')
public Foo transform(Bar bar) {
//transform logic removed for brevity
new Foo(name: 'Foo1', quantity: 1)
}
然后我有一个 ServiceActivator
组件,我希望从我的 fooChannel
中使用它,如下所示:
@ServiceActivator(inputChannel = 'fooChannel')
void consumeFoos(Foo foo){
// Do something with foo
}
我正在使用 spring-integration-core:4.2.5.RELEASE
和 spring-integration-amqp:4.2.5.RELEASE
。
谁能告诉我 Spring 集成应用程序的配置哪里出了问题?
如果需要任何进一步的信息以更好地阐明我的疑问或问题,请告诉我。谢谢
是 - amqp 支持的通道目前仅限于 Java 个可序列化对象。
我们应该提供一个选项来将 Message<?>
映射到 Spring AMQP Message
(就像通道适配器那样)而不是...
this.amqpTemplate.convertAndSend(this.getExchangeName(), this.getRoutingKey(), message);
...转换整个消息。
您可以使用一对通道适配器 (outbound/inbound) 而不是一个通道。
由于您使用的是 Java 配置,因此您可以将适配器对包装在新的 MessageChannel
实现中。
我开了一个JIRA Issue for this.
我正在尝试在我的 Spring 集成应用程序中使用 AMQP 支持的消息通道,但我认为我从根本上误解了一些东西,特别是围绕 Message<?>
接口以及 GenericMessage<?>
被写入和读取,一个 RabbitMQ 队列。
鉴于我有一个包含以下域模型对象的 Spring 集成应用程序:
@Immutable
class Foo {
String name
long quantity
}
我声明了一个名为 fooChannel
的 AMQP 支持的消息通道,如下所示:
@Bean
public AmqpChannelFactoryBean deliveryPlacementChannel(CachingConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true)
factoryBean.setConnectionFactory(connectionFactory)
factoryBean.setQueueName("foo")
factoryBean.beanName = 'fooChannel'
factoryBean.setPubSub(false)
factoryBean
}
当我最初尝试向我的 fooChannel
发送消息时,我收到了 java.io.NotSerializableException
。我意识到这是因为我的 AMQP 支持的 fooChannel
使用的 RabbitTemplate
使用的 org.springframework.amqp.support.converter.SimpleMessageConverter
只能与字符串、可序列化实例或字节数组一起使用,我的 Foo
模型是那些东西的 none。
因此,我认为我应该使用 org.springframework.amqp.support.converter.Jackson2JsonMessageConverter
来确保我的 Foo
模型被正确转换 to/from 和 AMQP 消息。但是,添加到支持我的 fooChannel
的 RabbitMQ 队列的消息类型似乎是 org.springframework.messaging.support.GenericMessage
类型。这意味着当我的 AMQP 支持 fooChannel
尝试使用来自 RabbitMQ 队列的消息时,它会收到以下异常:
Caused by: com.fasterxml.jackson.databind.JsonMappingException: No suitable constructor found for type [simple type, class org.springframework.messaging.support.GenericMessage]: can not instantiate from JSON object (missing default constructor or creator, or perhaps need to add/enable type information?)
通过查看 GenericMessage
class,我发现它被设计为不可变的,这清楚地解释了为什么 Jackson2JsonMessageConverter
无法从 JSON 转换为GenericMessage
类型。但是,我不确定我应该做什么才能让我的 fooChannel
得到 AMQP 的支持,并使包含我的 Foo
模型的 Spring 集成消息的转换正常工作?
就我的应用程序流程而言,我有以下 Transformer
组件,它使用来自(非 AMQP 支持)barChannel
的 Bar
模型并放置 Foo
模特上fooChannel
如下:
@Transformer(inputChannel = 'barChannel', outputChannel = 'fooChannel')
public Foo transform(Bar bar) {
//transform logic removed for brevity
new Foo(name: 'Foo1', quantity: 1)
}
然后我有一个 ServiceActivator
组件,我希望从我的 fooChannel
中使用它,如下所示:
@ServiceActivator(inputChannel = 'fooChannel')
void consumeFoos(Foo foo){
// Do something with foo
}
我正在使用 spring-integration-core:4.2.5.RELEASE
和 spring-integration-amqp:4.2.5.RELEASE
。
谁能告诉我 Spring 集成应用程序的配置哪里出了问题?
如果需要任何进一步的信息以更好地阐明我的疑问或问题,请告诉我。谢谢
是 - amqp 支持的通道目前仅限于 Java 个可序列化对象。
我们应该提供一个选项来将 Message<?>
映射到 Spring AMQP Message
(就像通道适配器那样)而不是...
this.amqpTemplate.convertAndSend(this.getExchangeName(), this.getRoutingKey(), message);
...转换整个消息。
您可以使用一对通道适配器 (outbound/inbound) 而不是一个通道。
由于您使用的是 Java 配置,因此您可以将适配器对包装在新的 MessageChannel
实现中。
我开了一个JIRA Issue for this.