Spring 集成 - Java DSL 中的 AMQP 推断类型?
Spring Integration - AMQP Inferred Types In Java DSL?
我一直在研究 "paved road" 使用 AMQP 在两个微服务之间设置异步消息传递。我们希望为每个服务推广使用单独的域 objects,这意味着每个服务必须定义自己的任何 objects 的副本,这些 objects 通过 queue.
我们在生产者和消费者端都使用 Jackson2JsonMessageConverter
,并且我们正在使用 Java DSL 连接流 to/from 和 queue。
我确定有办法做到这一点,但它正在逃避我:我希望消费者方忽略从生产者传递的 __TypeID__
header 作为消费者可能对该事件有不同的表示(并且它可能在不同的 java 包中)。
似乎已经完成了一些工作,如果使用注释 @RabbitListener
,则会派生一个 inferredArgumentType
参数并将覆盖 header 信息。这正是我想要做的,但我想使用 Java DSL 来完成它。我还没有找到一种干净的方法来做到这一点,也许我只是遗漏了一些明显的东西。使用以下 DSL 时,派生类型似乎相当简单:
return IntegrationFlows
.from(
Amqp.inboundAdapter(factory, queueRemoteTaskStatus())
.concurrentConsumers(10)
.errorHandler(errorHandler)
.messageConverter(messageConverter)
)
.channel(channelRemoteTaskStatusIn())
.handle(listener, "handleRemoteTaskStatus")
.get();
但是,这会导致 ClassNotFound
异常。到目前为止,我发现解决这个问题的唯一方法是设置自定义消息转换器,这需要明确定义类型。
public class ForcedTypeJsonMessageConverter extends Jackson2JsonMessageConverter {
ForcedTypeJsonMessageConverter(final Class<?> forcedType) {
setClassMapper(new ClassMapper() {
@Override
public void fromClass(Class<?> clazz, MessageProperties properties) {
//this class is only used for inbound marshalling.
}
@Override
public Class<?> toClass(MessageProperties properties) {
return forcedType;
}
});
}
}
我真的很想派生这个,所以开发人员不必真正处理这个。
有更简单的方法吗?
最简单的方法是将 Jackson 转换器的 DefaultJackson2JavaTypeMapper
配置为 TypeIdMapping
(setIdClassMapping()
)。
在发送系统上,映射 foo:com.one.Foo
,在接收系统上映射 foo:com.two.Foo
。
然后,__TypeId__
header 得到 foo
,接收系统会将其映射到 Foo
.
的表示
编辑
另一种选择是将 afterReceiveMessagePostProcessor
添加到入站通道适配器的侦听器容器 - 它可以更改 __TypeId__
header.
我一直在研究 "paved road" 使用 AMQP 在两个微服务之间设置异步消息传递。我们希望为每个服务推广使用单独的域 objects,这意味着每个服务必须定义自己的任何 objects 的副本,这些 objects 通过 queue.
我们在生产者和消费者端都使用 Jackson2JsonMessageConverter
,并且我们正在使用 Java DSL 连接流 to/from 和 queue。
我确定有办法做到这一点,但它正在逃避我:我希望消费者方忽略从生产者传递的 __TypeID__
header 作为消费者可能对该事件有不同的表示(并且它可能在不同的 java 包中)。
似乎已经完成了一些工作,如果使用注释 @RabbitListener
,则会派生一个 inferredArgumentType
参数并将覆盖 header 信息。这正是我想要做的,但我想使用 Java DSL 来完成它。我还没有找到一种干净的方法来做到这一点,也许我只是遗漏了一些明显的东西。使用以下 DSL 时,派生类型似乎相当简单:
return IntegrationFlows
.from(
Amqp.inboundAdapter(factory, queueRemoteTaskStatus())
.concurrentConsumers(10)
.errorHandler(errorHandler)
.messageConverter(messageConverter)
)
.channel(channelRemoteTaskStatusIn())
.handle(listener, "handleRemoteTaskStatus")
.get();
但是,这会导致 ClassNotFound
异常。到目前为止,我发现解决这个问题的唯一方法是设置自定义消息转换器,这需要明确定义类型。
public class ForcedTypeJsonMessageConverter extends Jackson2JsonMessageConverter {
ForcedTypeJsonMessageConverter(final Class<?> forcedType) {
setClassMapper(new ClassMapper() {
@Override
public void fromClass(Class<?> clazz, MessageProperties properties) {
//this class is only used for inbound marshalling.
}
@Override
public Class<?> toClass(MessageProperties properties) {
return forcedType;
}
});
}
}
我真的很想派生这个,所以开发人员不必真正处理这个。
有更简单的方法吗?
最简单的方法是将 Jackson 转换器的 DefaultJackson2JavaTypeMapper
配置为 TypeIdMapping
(setIdClassMapping()
)。
在发送系统上,映射 foo:com.one.Foo
,在接收系统上映射 foo:com.two.Foo
。
然后,__TypeId__
header 得到 foo
,接收系统会将其映射到 Foo
.
编辑
另一种选择是将 afterReceiveMessagePostProcessor
添加到入站通道适配器的侦听器容器 - 它可以更改 __TypeId__
header.