Spring 一次垃圾输入后云流应用程序退出
Spring cloud stream application quits after one garbage input
使用 KStream 组件的 Spring 云流应用程序有问题。它正在侦听一个输入并在处理消息后将消息定向到一个输出。
它期待 JSON 字符串进入并尝试在到达时将其转换为 Spring 元组。发送消息时会发生相反的情况。
问题是,当系统管理员想用 kafka-console-producer.sh
测试一个主题时...并打印一个字符串
"lol"
在其中,整个 Spring 云流应用程序将死在那里,但有以下例外:
java.lang.RuntimeException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'lol': was expecting ('true', 'false' or 'null')
at [Source: lol; line: 1, column: 7]
at org.springframework.tuple.JsonStringToTupleConverter.convert(JsonStringToTupleConverter.java:71) ~[spring-tuple-1.0.0.RELEASE.jar:na]
at org.springframework.tuple.JsonStringToTupleConverter.convert(JsonStringToTupleConverter.java:31) ~[spring-tuple-1.0.0.RELEASE.jar:na]
at org.springframework.tuple.TupleBuilder.fromString(TupleBuilder.java:153) ~[spring-tuple-1.0.0.RELEASE.jar:na]
at org.springframework.cloud.stream.converter.TupleJsonMessageConverter.convertFromInternal(TupleJsonMessageConverter.java:90) ~[spring-cloud-stream-1.3.2.RELEASE.jar:1.3.2.RELEASE]
at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:175) ~[spring-messaging-4.3.14.RELEASE.jar:4.3.14.RELEASE]
at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:167) ~[spring-messaging-4.3.14.RELEASE.jar:4.3.14.RELEASE]
at org.springframework.messaging.converter.CompositeMessageConverter.fromMessage(CompositeMessageConverter.java:55) ~[spring-messaging-4.3.14.RELEASE.jar:4.3.14.RELEASE]
at org.springframework.cloud.stream.binder.kstream.KStreamListenerParameterAdapter.apply(KStreamListenerParameterAdapter.java:66) ~[spring-cloud-stream-binder-kstream-1.3.2.RELEASE.jar:1.3.2.RELEASE]
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42) ~[kafka-streams-0.10.1.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) ~[kafka-streams-0.10.1.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202) ~[kafka-streams-0.10.1.1.jar:na]
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66) ~[kafka-streams-0.10.1.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180) ~[kafka-streams-0.10.1.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436) ~[kafka-streams-0.10.1.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) ~[kafka-streams-0.10.1.1.jar:na]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'lol': was expecting ('true', 'false' or 'null')
at [Source: lol; line: 1, column: 7]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702) ~[jackson-core-2.8.10.jar:2.8.10]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558) ~[jackson-core-2.8.10.jar:2.8.10]
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2839) ~[jackson-core-2.8.10.jar:2.8.10]
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1903) ~[jackson-core-2.8.10.jar:2.8.10]
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:749) ~[jackson-core-2.8.10.jar:2.8.10]
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3850) ~[jackson-databind-2.8.10.jar:2.8.10]
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3799) ~[jackson-databind-2.8.10.jar:2.8.10]
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2397) ~[jackson-databind-2.8.10.jar:2.8.10]
at org.springframework.tuple.JsonStringToTupleConverter.convert(JsonStringToTupleConverter.java:44) ~[spring-tuple-1.0.0.RELEASE.jar:na]
我希望该框架至少对此类行为具有一定的容错能力。您不能期望输入总是很好而且很漂亮。
所以我查看了 Spring 文档:https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_configuration_options
并且有一些配置选项似乎是一些隐藏的重试逻辑实现,以防失败。例如 maxAttempts
参数。但是这个参数已经使用了默认值 3,但我没有看到 Spring 云流应用程序试图挽救这个错误。
所以我想知道为 Spring 云流应用程序构建错误输入容忍度的推荐方法是什么。
应用程序的配置如下所示:
spring:
cloud:
stream:
bindings:
input:
content-type: application/json
destination: inbound
group: fraud
consumer:
headerMode: raw
output:
content-type: application/x-spring-tuple
destination: outbound
producer:
headerMode: raw
useNativeEncoding: true
spring.cloud.stream.kstream.binder.configuration:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
在 Spring Cloud Stream 1.3.x (Ditmars) 中,对 Kafka Streams 的错误处理支持非常有限。事实上,由应用程序来处理 1.3 kafka 流库中的任何错误。但是,在 2.0.0 中,我们添加了对 KIP-161 的支持。 https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers
使用kafka streams binder 2.0.0版本的这个新特性,你可以logAndSkip记录或者logAndFail记录反序列化错误。除了这些,binder还提供了一个DLQ发送异常处理器的实现。所有这些的文档仍在 2.0 行上更新。准备就绪后,我将在此处更新文档链接。但是,这是它的要点。
spring.cloud.stream.kafka.streams.binder.serdeError: sendToDlq
(或 logAndFail 或 logAndSkip)
spring.cloud.stream.kafka.stream.bindings.input.consumer.dlqName:[dlq name]
- 如果未提供,则为 error.[incoming-topic].[group-name]
。
然后你会在DLQ主题中看到反序列化出错的记录。
同样,这些功能仅在 2.0.0.BUILD-SNAPSHOT 中可用,并且将成为即将发布的 2.0.0.RC1
版本的一部分。
使用 KStream 组件的 Spring 云流应用程序有问题。它正在侦听一个输入并在处理消息后将消息定向到一个输出。
它期待 JSON 字符串进入并尝试在到达时将其转换为 Spring 元组。发送消息时会发生相反的情况。
问题是,当系统管理员想用 kafka-console-producer.sh
测试一个主题时...并打印一个字符串
"lol"
在其中,整个 Spring 云流应用程序将死在那里,但有以下例外:
java.lang.RuntimeException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'lol': was expecting ('true', 'false' or 'null')
at [Source: lol; line: 1, column: 7]
at org.springframework.tuple.JsonStringToTupleConverter.convert(JsonStringToTupleConverter.java:71) ~[spring-tuple-1.0.0.RELEASE.jar:na]
at org.springframework.tuple.JsonStringToTupleConverter.convert(JsonStringToTupleConverter.java:31) ~[spring-tuple-1.0.0.RELEASE.jar:na]
at org.springframework.tuple.TupleBuilder.fromString(TupleBuilder.java:153) ~[spring-tuple-1.0.0.RELEASE.jar:na]
at org.springframework.cloud.stream.converter.TupleJsonMessageConverter.convertFromInternal(TupleJsonMessageConverter.java:90) ~[spring-cloud-stream-1.3.2.RELEASE.jar:1.3.2.RELEASE]
at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:175) ~[spring-messaging-4.3.14.RELEASE.jar:4.3.14.RELEASE]
at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:167) ~[spring-messaging-4.3.14.RELEASE.jar:4.3.14.RELEASE]
at org.springframework.messaging.converter.CompositeMessageConverter.fromMessage(CompositeMessageConverter.java:55) ~[spring-messaging-4.3.14.RELEASE.jar:4.3.14.RELEASE]
at org.springframework.cloud.stream.binder.kstream.KStreamListenerParameterAdapter.apply(KStreamListenerParameterAdapter.java:66) ~[spring-cloud-stream-binder-kstream-1.3.2.RELEASE.jar:1.3.2.RELEASE]
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42) ~[kafka-streams-0.10.1.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) ~[kafka-streams-0.10.1.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202) ~[kafka-streams-0.10.1.1.jar:na]
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66) ~[kafka-streams-0.10.1.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180) ~[kafka-streams-0.10.1.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436) ~[kafka-streams-0.10.1.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) ~[kafka-streams-0.10.1.1.jar:na]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'lol': was expecting ('true', 'false' or 'null')
at [Source: lol; line: 1, column: 7]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702) ~[jackson-core-2.8.10.jar:2.8.10]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558) ~[jackson-core-2.8.10.jar:2.8.10]
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2839) ~[jackson-core-2.8.10.jar:2.8.10]
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1903) ~[jackson-core-2.8.10.jar:2.8.10]
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:749) ~[jackson-core-2.8.10.jar:2.8.10]
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3850) ~[jackson-databind-2.8.10.jar:2.8.10]
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3799) ~[jackson-databind-2.8.10.jar:2.8.10]
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2397) ~[jackson-databind-2.8.10.jar:2.8.10]
at org.springframework.tuple.JsonStringToTupleConverter.convert(JsonStringToTupleConverter.java:44) ~[spring-tuple-1.0.0.RELEASE.jar:na]
我希望该框架至少对此类行为具有一定的容错能力。您不能期望输入总是很好而且很漂亮。 所以我查看了 Spring 文档:https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_configuration_options
并且有一些配置选项似乎是一些隐藏的重试逻辑实现,以防失败。例如 maxAttempts
参数。但是这个参数已经使用了默认值 3,但我没有看到 Spring 云流应用程序试图挽救这个错误。
所以我想知道为 Spring 云流应用程序构建错误输入容忍度的推荐方法是什么。
应用程序的配置如下所示:
spring:
cloud:
stream:
bindings:
input:
content-type: application/json
destination: inbound
group: fraud
consumer:
headerMode: raw
output:
content-type: application/x-spring-tuple
destination: outbound
producer:
headerMode: raw
useNativeEncoding: true
spring.cloud.stream.kstream.binder.configuration:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
在 Spring Cloud Stream 1.3.x (Ditmars) 中,对 Kafka Streams 的错误处理支持非常有限。事实上,由应用程序来处理 1.3 kafka 流库中的任何错误。但是,在 2.0.0 中,我们添加了对 KIP-161 的支持。 https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers
使用kafka streams binder 2.0.0版本的这个新特性,你可以logAndSkip记录或者logAndFail记录反序列化错误。除了这些,binder还提供了一个DLQ发送异常处理器的实现。所有这些的文档仍在 2.0 行上更新。准备就绪后,我将在此处更新文档链接。但是,这是它的要点。
spring.cloud.stream.kafka.streams.binder.serdeError: sendToDlq
(或 logAndFail 或 logAndSkip)
spring.cloud.stream.kafka.stream.bindings.input.consumer.dlqName:[dlq name]
- 如果未提供,则为 error.[incoming-topic].[group-name]
。
然后你会在DLQ主题中看到反序列化出错的记录。
同样,这些功能仅在 2.0.0.BUILD-SNAPSHOT 中可用,并且将成为即将发布的 2.0.0.RC1
版本的一部分。