如何将自定义 header 从 spring-cloud-stream (aws kinesis binder) 发送到 "legacy" spring 集成消费者
How to send custom header from spring-cloud-stream (aws kinesis binder) to "legacy" spring integration consumer
我有两个应用程序 - 第一个使用 spring-cloud-stream/function 和 AWS Kinesis Binder 生成消息,第二个是基于 spring 集成来使用消息的应用程序。两者之间的通信不是问题 - 我可以从“流”发送消息并在“集成”中轻松处理它。
当我想发送自定义 header 时,出现了问题。 header 作为嵌入式 header 使用“新”格式(开头有 0xff 等)到达消费者 - 请参阅 spring-cloud-stream.[=15 中的 AbstractMessageChannelBinder#serializeAndEmbedHeadersIfApplicable =]
但是,KinesisMessageDrivenChannelAdapter (spring-integration-aws) 似乎不理解“新”嵌入的 header 形式。它使用无法“解码”消息的 EmbeddedJsonHeadersMessageMapper(参见#toMessage)。由于嵌入的 header(0xff 等)中包含的附加信息,它会抛出 com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ÿ': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
。
我需要通过线路发送 header(header 用于在另一端路由),因此“关闭”header 不是一个选项由制作人负责。我看不到使用“旧”嵌入式 headers 的方法。
我想在制作方使用 spring-cloud-stream/function - 这太棒了。我希望我可以重做消费者,但是...
我可以编写自己的嵌入式 header 映射器来理解新格式(使用 EmbeddedHeaderUtils),并将其连接到 KinesisMessageDrivenChannelAdapter。
鉴于spring-cloud-stream和spring-integration之间的密切关系,我一定是做错了什么。 Spring Integration 是否具有理解新嵌入表单的 OutboundMessageMapper?
或者有没有办法强制 spring 云流使用不同的嵌入策略?
我可以在制作方使用 Spring 集成。 (悲伤的脸)。
有什么想法吗?提前致谢。
understands the new format
这不是一种“新”格式,它是 Spring Cloud Stream 创建的一种格式,最初是为 Kafka 创建的,它只在 0.11 中添加了 header 支持。
I could write my own embedded header mapper that understands the new format (use EmbeddedHeaderUtils
), and wire it into the KinesisMessageDrivenChannelAdapter
.
我建议您这样做,并考虑将 contributing it to the core Spring Integration Project 与 EmbeddedJsonHeadersMessageMapper
一起使用,以便它可以与所有不支持本机 header 的技术一起使用。
我有两个应用程序 - 第一个使用 spring-cloud-stream/function 和 AWS Kinesis Binder 生成消息,第二个是基于 spring 集成来使用消息的应用程序。两者之间的通信不是问题 - 我可以从“流”发送消息并在“集成”中轻松处理它。
当我想发送自定义 header 时,出现了问题。 header 作为嵌入式 header 使用“新”格式(开头有 0xff 等)到达消费者 - 请参阅 spring-cloud-stream.[=15 中的 AbstractMessageChannelBinder#serializeAndEmbedHeadersIfApplicable =]
但是,KinesisMessageDrivenChannelAdapter (spring-integration-aws) 似乎不理解“新”嵌入的 header 形式。它使用无法“解码”消息的 EmbeddedJsonHeadersMessageMapper(参见#toMessage)。由于嵌入的 header(0xff 等)中包含的附加信息,它会抛出 com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ÿ': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
。
我需要通过线路发送 header(header 用于在另一端路由),因此“关闭”header 不是一个选项由制作人负责。我看不到使用“旧”嵌入式 headers 的方法。
我想在制作方使用 spring-cloud-stream/function - 这太棒了。我希望我可以重做消费者,但是...
我可以编写自己的嵌入式 header 映射器来理解新格式(使用 EmbeddedHeaderUtils),并将其连接到 KinesisMessageDrivenChannelAdapter。
鉴于spring-cloud-stream和spring-integration之间的密切关系,我一定是做错了什么。 Spring Integration 是否具有理解新嵌入表单的 OutboundMessageMapper?
或者有没有办法强制 spring 云流使用不同的嵌入策略?
我可以在制作方使用 Spring 集成。 (悲伤的脸)。
有什么想法吗?提前致谢。
understands the new format
这不是一种“新”格式,它是 Spring Cloud Stream 创建的一种格式,最初是为 Kafka 创建的,它只在 0.11 中添加了 header 支持。
I could write my own embedded header mapper that understands the new format (use
EmbeddedHeaderUtils
), and wire it into theKinesisMessageDrivenChannelAdapter
.
我建议您这样做,并考虑将 contributing it to the core Spring Integration Project 与 EmbeddedJsonHeadersMessageMapper
一起使用,以便它可以与所有不支持本机 header 的技术一起使用。