KStream.map 中的空值导致 IllegalArgumentException:负载不得为空
Null value in KStream.map causing IllegalArgumentException: Payload must not be null
问题描述:
我正在创建一个 spring 云 Kafka 流应用程序。我有一个输入主题和一个输出主题,我正在尝试使用 KStream.map 函数
对输入主题应用 KStream 键值转换操作
如果我将转换后的值设为 null,则该函数会抛出 IllegalArgumentException
我的问题是:
1:异常的原因? 虽然在文档中说:"Input records with a null key or a null value are ignored"
2:在 stateless/stateful 操作中处理异常的最佳实践?一个 try/catch 环绕整个加工计划就够了吗?或者我应该在每个转换函数(例如 filter、map、join、reduce)中有一个 try/catch?
如有任何想法,我们将不胜感激。
应用程序配置:
spring:
application:
name:kafka-streams-test
cloud.stream:
kafka.streams:
binder:
brokers: localhost:9093
configuration:
commit.interval.ms: 1000
security.protocol: SASL_PLAINTEXT
sasl.mechanism: GSSAPI
sasl.kerberos.service.name: kafka
serdeError: logAndContinue
bindings:
streams-words-input:
consumer:
application-id: Input-Words
streams-words-output:
consumer:
application-id: Output-Words
bindings:
streams-words-input:
destination: streams-words-input
streams-words-output:
destination: streams-words-output
示例代码:
@StreamListener()
@SendTo("streams-words-output")
public KStream<String, Long> createWords(
@Input("streams-words-input") final KStream<String, String> wordsInput){
return wordsInput
.map((key,value) -> KeyValue.pair(key, null));
}
异常堆栈跟踪:
java.lang.IllegalArgumentException: Payload must not be null
at org.springframework.util.Assert.notNull(Assert.java:198)
at org.springframework.messaging.support.MessageBuilder.<init>(MessageBuilder.java:57)
at org.springframework.messaging.support.MessageBuilder.withPayload(MessageBuilder.java:179)
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate.lambda$serializeOnOutbound[=13=](KafkaStreamsMessageConversionDelegate.java:86)
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate$$Lambda3/1725151361.apply(Unknown Source)
at org.apache.kafka.streams.kstream.internals.AbstractStream.apply(AbstractStream.java:87)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:40)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamPassThrough$KStreamPassThroughProcessor.process(KStreamPassThrough.java:33)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
at org.apache.kafka.streams.kstream.internals.KStreamBranch$KStreamBranchProcessor.process(KStreamBranch.java:48)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:302)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:409)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:964)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:832)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
您看到的异常来自 Spring 消息传递长期存在的规则 - “没有消息具有空负载 ”。换句话说,如果没有什么可通信的,就没有要发送的消息。
也就是说,KStream 及其处理这种情况的方式显然存在问题,因此我建议在 Kafka binder 中提出问题。同时,您可以轻松地向管道添加 filter
操作以过滤掉空值。
问题描述:
我正在创建一个 spring 云 Kafka 流应用程序。我有一个输入主题和一个输出主题,我正在尝试使用 KStream.map 函数
对输入主题应用 KStream 键值转换操作
如果我将转换后的值设为 null,则该函数会抛出 IllegalArgumentException
我的问题是:
1:异常的原因? 虽然在文档中说:"Input records with a null key or a null value are ignored"
2:在 stateless/stateful 操作中处理异常的最佳实践?一个 try/catch 环绕整个加工计划就够了吗?或者我应该在每个转换函数(例如 filter、map、join、reduce)中有一个 try/catch?
如有任何想法,我们将不胜感激。
应用程序配置:
spring:
application:
name:kafka-streams-test
cloud.stream:
kafka.streams:
binder:
brokers: localhost:9093
configuration:
commit.interval.ms: 1000
security.protocol: SASL_PLAINTEXT
sasl.mechanism: GSSAPI
sasl.kerberos.service.name: kafka
serdeError: logAndContinue
bindings:
streams-words-input:
consumer:
application-id: Input-Words
streams-words-output:
consumer:
application-id: Output-Words
bindings:
streams-words-input:
destination: streams-words-input
streams-words-output:
destination: streams-words-output
示例代码:
@StreamListener()
@SendTo("streams-words-output")
public KStream<String, Long> createWords(
@Input("streams-words-input") final KStream<String, String> wordsInput){
return wordsInput
.map((key,value) -> KeyValue.pair(key, null));
}
异常堆栈跟踪:
java.lang.IllegalArgumentException: Payload must not be null
at org.springframework.util.Assert.notNull(Assert.java:198)
at org.springframework.messaging.support.MessageBuilder.<init>(MessageBuilder.java:57)
at org.springframework.messaging.support.MessageBuilder.withPayload(MessageBuilder.java:179)
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate.lambda$serializeOnOutbound[=13=](KafkaStreamsMessageConversionDelegate.java:86)
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate$$Lambda3/1725151361.apply(Unknown Source)
at org.apache.kafka.streams.kstream.internals.AbstractStream.apply(AbstractStream.java:87)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:40)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamPassThrough$KStreamPassThroughProcessor.process(KStreamPassThrough.java:33)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
at org.apache.kafka.streams.kstream.internals.KStreamBranch$KStreamBranchProcessor.process(KStreamBranch.java:48)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:302)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:409)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:964)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:832)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
您看到的异常来自 Spring 消息传递长期存在的规则 - “没有消息具有空负载 ”。换句话说,如果没有什么可通信的,就没有要发送的消息。
也就是说,KStream 及其处理这种情况的方式显然存在问题,因此我建议在 Kafka binder 中提出问题。同时,您可以轻松地向管道添加 filter
操作以过滤掉空值。