无法解码 json 类型 spring 云流 DefaultKafkaHeaderMapper
Could not decode json type in spring cloud stream DefaultKafkaHeaderMapper
我们正在使用 spring-cloud-stream 并计划升级我们的 Kafka 版本。
我们的应用程序使用 spring-cloud-stream:2.0.0
(spring-kafka 2.1.7) 和 apache kafka 服务器 1.0.1
并使用 spring-cloud-sleuth:2.0.0
进行跟踪。
我们要将我们的 Kafka 服务器升级到版本 2.3.0
,所以它需要升级到 spring-boot 2.2.x (Hoxton)
和 spring-cloud-sleuth:2.2.0
和 spring-cloud-stream:3.0.3 (Horsham.SR3)
.
我们有大约 200 个使用 Kafka 的应用程序,因此升级将逐步进行,因此作为中间状态,我们将在较新版本上有 producer 和 consumers 使用旧版本。
我们的消费者正在使用 @StreamListener
.
在我们的测试期间,我们遇到了一个问题,即解析大多数类型为 String
的 header 并得到以下内容:
ERROR 27448 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder : Could not decode json type: ecb89ccb3e79418b for key: X-B3-TraceId
com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ecb89ccb3e79418b': was expecting ('true', 'false' or 'null')
at [Source: (byte[])"ecb89ccb3e79418b"; line: 1, column: 33]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804) ~[jackson-core-2.9.6.jar:2.9.6]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:679) ~[jackson-core-2.9.6.jar:2.9.6]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3526) ~[jackson-core-2.9.6.jar:2.9.6]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2621) ~[jackson-core-2.9.6.jar:2.9.6]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:826) ~[jackson-core-2.9.6.jar:2.9.6]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:723) ~[jackson-core-2.9.6.jar:2.9.6]
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4141) ~[jackson-databind-2.9.6.jar:2.9.6]
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4000) ~[jackson-databind-2.9.6.jar:2.9.6]
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3091) ~[jackson-databind-2.9.6.jar:2.9.6]
at org.springframework.kafka.support.DefaultKafkaHeaderMapper.lambda$toHeaders(DefaultKafkaHeaderMapper.java:233) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
at java.lang.Iterable.forEach(Iterable.java:75) ~[na:1.8.0_221]
at org.springframework.kafka.support.DefaultKafkaHeaderMapper.toHeaders(DefaultKafkaHeaderMapper.java:216) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.toHeaders(KafkaMessageChannelBinder.java:554) ~[spring-cloud-stream-binder-kafka-2.0.0.RELEASE.jar:2.0.0.RELEASE]
at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:106) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:229) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
...
而类型 header 是:
{spanTraceId=java.lang.String, spanId=java.lang.String, spanParentSpanId=java.lang.String, nativeHeaders=org.springframework.util.LinkedMultiValueMap, X-B3-SpanId=java.lang.String, X-B3-ParentSpanId=java.lang.String, scst_partition=java.lang.Integer, X-B3-Sampled=java.lang.String, X-B3-TraceId=java.lang.String, spanSampled=java.lang.String, contentType=java.lang.String}
例如 Sleuth 添加的 X-B3-SpanId
是字符串类型,值为:ecb89ccb3e79418b 不是 JSON 字符串,因此ObjectMapper 在转换为字符串 Object 时失败:
headers.put(h.key(), getObjectMapper().readValue(h.value(), type))
看起来它不应该在我们有 String 类型时使用 ObjectMapper,因此我们的老消费者失败了。
有没有办法在使用新生产者和旧消费者时防止这个问题?
您可以配置 DefaultKafkaHeaderMapper
以与旧版本兼容:
/**
* Set to true to encode String-valued headers as JSON ("..."), by default just the
* raw String value is converted to a byte array using the configured charset. Set to
* true if a consumer of the outbound record is using Spring for Apache Kafka version
* less than 2.3
* @param encodeStrings true to encode (default false).
* @since 2.3
*/
public void setEncodeStrings(boolean encodeStrings) {
this.encodeStrings = encodeStrings;
}
spring.cloud.stream.kafka.binder.headerMapperBeanName
我们正在使用 spring-cloud-stream 并计划升级我们的 Kafka 版本。
我们的应用程序使用 spring-cloud-stream:2.0.0
(spring-kafka 2.1.7) 和 apache kafka 服务器 1.0.1
并使用 spring-cloud-sleuth:2.0.0
进行跟踪。
我们要将我们的 Kafka 服务器升级到版本 2.3.0
,所以它需要升级到 spring-boot 2.2.x (Hoxton)
和 spring-cloud-sleuth:2.2.0
和 spring-cloud-stream:3.0.3 (Horsham.SR3)
.
我们有大约 200 个使用 Kafka 的应用程序,因此升级将逐步进行,因此作为中间状态,我们将在较新版本上有 producer 和 consumers 使用旧版本。
我们的消费者正在使用 @StreamListener
.
在我们的测试期间,我们遇到了一个问题,即解析大多数类型为 String
的 header 并得到以下内容:
ERROR 27448 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder : Could not decode json type: ecb89ccb3e79418b for key: X-B3-TraceId
com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ecb89ccb3e79418b': was expecting ('true', 'false' or 'null')
at [Source: (byte[])"ecb89ccb3e79418b"; line: 1, column: 33]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804) ~[jackson-core-2.9.6.jar:2.9.6]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:679) ~[jackson-core-2.9.6.jar:2.9.6]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3526) ~[jackson-core-2.9.6.jar:2.9.6]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2621) ~[jackson-core-2.9.6.jar:2.9.6]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:826) ~[jackson-core-2.9.6.jar:2.9.6]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:723) ~[jackson-core-2.9.6.jar:2.9.6]
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4141) ~[jackson-databind-2.9.6.jar:2.9.6]
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4000) ~[jackson-databind-2.9.6.jar:2.9.6]
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3091) ~[jackson-databind-2.9.6.jar:2.9.6]
at org.springframework.kafka.support.DefaultKafkaHeaderMapper.lambda$toHeaders(DefaultKafkaHeaderMapper.java:233) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
at java.lang.Iterable.forEach(Iterable.java:75) ~[na:1.8.0_221]
at org.springframework.kafka.support.DefaultKafkaHeaderMapper.toHeaders(DefaultKafkaHeaderMapper.java:216) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.toHeaders(KafkaMessageChannelBinder.java:554) ~[spring-cloud-stream-binder-kafka-2.0.0.RELEASE.jar:2.0.0.RELEASE]
at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:106) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:229) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
...
而类型 header 是:
{spanTraceId=java.lang.String, spanId=java.lang.String, spanParentSpanId=java.lang.String, nativeHeaders=org.springframework.util.LinkedMultiValueMap, X-B3-SpanId=java.lang.String, X-B3-ParentSpanId=java.lang.String, scst_partition=java.lang.Integer, X-B3-Sampled=java.lang.String, X-B3-TraceId=java.lang.String, spanSampled=java.lang.String, contentType=java.lang.String}
例如 Sleuth 添加的 X-B3-SpanId
是字符串类型,值为:ecb89ccb3e79418b 不是 JSON 字符串,因此ObjectMapper 在转换为字符串 Object 时失败:
headers.put(h.key(), getObjectMapper().readValue(h.value(), type))
看起来它不应该在我们有 String 类型时使用 ObjectMapper,因此我们的老消费者失败了。
有没有办法在使用新生产者和旧消费者时防止这个问题?
您可以配置 DefaultKafkaHeaderMapper
以与旧版本兼容:
/**
* Set to true to encode String-valued headers as JSON ("..."), by default just the
* raw String value is converted to a byte array using the configured charset. Set to
* true if a consumer of the outbound record is using Spring for Apache Kafka version
* less than 2.3
* @param encodeStrings true to encode (default false).
* @since 2.3
*/
public void setEncodeStrings(boolean encodeStrings) {
this.encodeStrings = encodeStrings;
}
spring.cloud.stream.kafka.binder.headerMapperBeanName