如何将自定义 header 从 spring-cloud-stream (aws kinesis binder) 发送到 "legacy" spring 集成消费者

How to send custom header from spring-cloud-stream (aws kinesis binder) to "legacy" spring integration consumer

我有两个应用程序 - 第一个使用 spring-cloud-stream/function 和 AWS Kinesis Binder 生成消息,第二个是基于 spring 集成来使用消息的应用程序。两者之间的通信不是问题 - 我可以从“流”发送消息并在“集成”中轻松处理它。

当我想发送自定义 header 时,出现了问题。 header 作为嵌入式 header 使用“新”格式(开头有 0xff 等)到达消费者 - 请参阅 spring-cloud-stream.[=15 中的 AbstractMessageChannelBinder#serializeAndEmbedHeadersIfApplicable =]

但是,KinesisMessageDrivenChannelAdapter (spring-integration-aws) 似乎不理解“新”嵌入的 header 形式。它使用无法“解码”消息的 EmbeddedJsonHeadersMessageMapper(参见#toMessage)。由于嵌入的 header(0xff 等)中包含的附加信息,它会抛出 com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ÿ': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')

我需要通过线路发送 header(header 用于在另一端路由),因此“关闭”header 不是一个选项由制作人负责。我看不到使用“旧”嵌入式 headers 的方法。

我想在制作方使用 spring-cloud-stream/function - 这太棒了。我希望我可以重做消费者,但是...

我可以编写自己的嵌入式 header 映射器来理解新格式(使用 EmbeddedHeaderUtils),并将其连接到 KinesisMessageDrivenChannelAdapter。

鉴于spring-cloud-stream和spring-integration之间的密切关系,我一定是做错了什么。 Spring Integration 是否具有理解新嵌入表单的 OutboundMessageMapper?

或者有没有办法强制 spring 云流使用不同的嵌入策略?

我可以在制作方使用 Spring 集成。 (悲伤的脸)。

有什么想法吗?提前致谢。

understands the new format

这不是一种“新”格式,它是 Spring Cloud Stream 创建的一种格式,最初是为 Kafka 创建的,它只在 0.11 中添加了 header 支持。

I could write my own embedded header mapper that understands the new format (use EmbeddedHeaderUtils), and wire it into the KinesisMessageDrivenChannelAdapter.

我建议您这样做,并考虑将 contributing it to the core Spring Integration ProjectEmbeddedJsonHeadersMessageMapper 一起使用,以便它可以与所有不支持本机 header 的技术一起使用。