spring-kafka-test 中的 ClassCastException 使用 `merger()`
ClassCastException in spring-kafka-test using `merger()`
我想使用 kafka-streams-test-utils 通过单元测试来测试我的 Kafka Streams 拓扑。我已经使用这个库很长时间了,并且我已经使用 TestNG 围绕我的测试构建了一些抽象层。
但是因为我在我的 Stream 中添加了一个 merge(...)
,所以我得到了以下异常:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001, topic=my-topic-2, partition=0, offset=0
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:318)
at org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:393)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: com.MyKey / value type: com.MyValue). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:302)
... 3 more
Caused by: java.lang.ClassCastException: class com.MyKey cannot be cast to class [B (com.MyValue is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:156)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:101)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
... 15 more
这是我如何使用 TopologyTestDriver 的 StreamBuilder 构建 Stream 的部分:
// Block 1
KStream<MyKey, MyValue> stream2 = streamsBuilder.stream(
"my-topic-2",
consumedAs(OtherKey.class, OtherValue.class, AllowEmpty.NONE) // Provides default json Serde
).flatMap(
(key, value) -> {
List<KeyValue<MyKey, MyValue>> list = new ArrayList<>();
// Do stuff an fill out the list
return list;
})
.through("tmp-topic");
// Block 2
KStream<MyKey, MyValue>[] branches = stream1
.merge(stream2)
... business stuff
为了生成有关源主题的消息,我正在使用 TopologyTestDriver.pipeInput(...)
用 JsonSerDes 初始化。
异常是通过转换 ByteArray 发生的,但我不知道为什么 ByteArraySerializer 的预期参数是相同的 class 但来自另一个模块而不是消耗的 class 加载。它们也可能由另一个类加载器加载。但是后台没有 Spring 堆栈,一切都应该同步运行。
我对这种行为感到很困惑。
Apache Kafka 依赖项的版本为:2.0.1,我正在使用 openjdk-11。是否可以对齐序列化程序的 class 加载?
仅当我在 my-topic-2 上生成内容时才会发生错误,合并的另一个主题工作正常。
没有看到您的所有代码,我不能肯定地说,但我认为可能会发生这种情况。
为 Serdes 提供 Consumed
只在消费来自输入主题的记录时提供 de/serialization; Kafka Streams 不会将它们传播到拓扑的其余部分。在任何时候,如果再次需要 Serde,Kafka Streams 会使用 StreamsConfig
中提供的那些。 Serdes.ByteArraySerde
是默认值。
我建议尝试两件事:
- 在您的接收器节点中使用
Produced.with(keySerde, valueSerde)
- 通过
StreamsConfig
. 为您的类型提供 Serde
HTH,让我知道结果如何。
-比尔
如@bbejeck 所述,您需要使用 different version of .through()
,它允许您覆盖应用于 K, V
.[=19 的默认 (ByteArraySerde
) serdes =]
KStream<K,V> through(java.lang.String topic,
Produced<K,V> produced)
Materialize this stream to a topic and creates a new KStream from the topic using the Produced instance for configuration of the key serde
, value serde
, and StreamPartitioner. ... This is equivalent to calling to(someTopic, Produced.with(keySerde, valueSerde)
and StreamsBuilder#stream(someTopicName, Consumed.with(keySerde, valueSerde)).
我想使用 kafka-streams-test-utils 通过单元测试来测试我的 Kafka Streams 拓扑。我已经使用这个库很长时间了,并且我已经使用 TestNG 围绕我的测试构建了一些抽象层。
但是因为我在我的 Stream 中添加了一个 merge(...)
,所以我得到了以下异常:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001, topic=my-topic-2, partition=0, offset=0
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:318)
at org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:393)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: com.MyKey / value type: com.MyValue). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:302)
... 3 more
Caused by: java.lang.ClassCastException: class com.MyKey cannot be cast to class [B (com.MyValue is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:156)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:101)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
... 15 more
这是我如何使用 TopologyTestDriver 的 StreamBuilder 构建 Stream 的部分:
// Block 1
KStream<MyKey, MyValue> stream2 = streamsBuilder.stream(
"my-topic-2",
consumedAs(OtherKey.class, OtherValue.class, AllowEmpty.NONE) // Provides default json Serde
).flatMap(
(key, value) -> {
List<KeyValue<MyKey, MyValue>> list = new ArrayList<>();
// Do stuff an fill out the list
return list;
})
.through("tmp-topic");
// Block 2
KStream<MyKey, MyValue>[] branches = stream1
.merge(stream2)
... business stuff
为了生成有关源主题的消息,我正在使用 TopologyTestDriver.pipeInput(...)
用 JsonSerDes 初始化。
异常是通过转换 ByteArray 发生的,但我不知道为什么 ByteArraySerializer 的预期参数是相同的 class 但来自另一个模块而不是消耗的 class 加载。它们也可能由另一个类加载器加载。但是后台没有 Spring 堆栈,一切都应该同步运行。
我对这种行为感到很困惑。
Apache Kafka 依赖项的版本为:2.0.1,我正在使用 openjdk-11。是否可以对齐序列化程序的 class 加载? 仅当我在 my-topic-2 上生成内容时才会发生错误,合并的另一个主题工作正常。
没有看到您的所有代码,我不能肯定地说,但我认为可能会发生这种情况。
为 Serdes 提供 Consumed
只在消费来自输入主题的记录时提供 de/serialization; Kafka Streams 不会将它们传播到拓扑的其余部分。在任何时候,如果再次需要 Serde,Kafka Streams 会使用 StreamsConfig
中提供的那些。 Serdes.ByteArraySerde
是默认值。
我建议尝试两件事:
- 在您的接收器节点中使用
Produced.with(keySerde, valueSerde)
- 通过
StreamsConfig
. 为您的类型提供 Serde
HTH,让我知道结果如何。
-比尔
如@bbejeck 所述,您需要使用 different version of .through()
,它允许您覆盖应用于 K, V
.[=19 的默认 (ByteArraySerde
) serdes =]
KStream<K,V> through(java.lang.String topic, Produced<K,V> produced)
Materialize this stream to a topic and creates a new KStream from the topic using the Produced instance for configuration of the
key serde
,value serde
, and StreamPartitioner. ... This is equivalent to callingto(someTopic, Produced.with(keySerde, valueSerde)
and StreamsBuilder#stream(someTopicName, Consumed.with(keySerde, valueSerde)).