Kafka流处理过程中处理数据时出现异常
Exception when processing data during Kafka stream process
我正在使用以下代码处理 Kafka 流。我从 JSON obj 中检查条件 if "UserID":"1"
的过滤条件。请参考以下代码
builder.<String,String>stream(Serdes.String(), Serdes.String(), topic)
.filter(new Predicate <String, String>() {
String userIDCheck = null;
@Override
public boolean test(String key, String value) {
try {
JSONObject jsonObj = new JSONObject(value);
userIDCheck = jsonObj.get("UserID").toString();
System.out.println("userIDCheck: " + userIDCheck);
} catch (JSONException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return userIDCheck.equals("1");
}
})
.to(streamouttopic);
值:{"UserID":"1","Address":"XXX","AccountNo":"989","UserName":"Stella","AccountType":"YYY"}
我收到以下错误:
Exception in thread "SampleStreamProducer-4eecc3ab-858c-44a4-9b8c-5ece2b4ab21a-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=testtopic1, partition=0, offset=270
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:203)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:679)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:557)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: unknown because key is null / value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:91)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:47)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:189)
... 3 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to [B
at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:89)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:76)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87)
上面的流代码的值和条件都很好,我不明白为什么在执行 steam 代码时会出现此异常。
报告的问题应该只适用于 Kafka 2.0 及更早版本。从 2.1.0 版本开始,Kafka Streams 支持“serde push down”并且 to()
运算符应该从上游继承正确的 serdes(参见 https://issues.apache.org/jira/browse/KAFKA-7456)。
对于 Kafka 2.0 及更早版本,您必须明确地为 to()
操作指定正确的 Serdes。否则,它使用来自 StreamsConfig
的默认 Serdes,即 ByteArraySerde
(因为语义或 serde 覆盖是每个运算符的“直接覆盖”)——并且无法转换 String
至 byte[]
.
您需要做的事情:
.to(streamoutputtopic, Produced.with(Serdes.String(), Serdes.String()));
对于尚未使用 Produced
参数的更旧版本(1.0 之前),代码将是:
.to(Serdes.String(), Serdes.String(), streamoutputtopic);
我正在使用以下代码处理 Kafka 流。我从 JSON obj 中检查条件 if "UserID":"1"
的过滤条件。请参考以下代码
builder.<String,String>stream(Serdes.String(), Serdes.String(), topic)
.filter(new Predicate <String, String>() {
String userIDCheck = null;
@Override
public boolean test(String key, String value) {
try {
JSONObject jsonObj = new JSONObject(value);
userIDCheck = jsonObj.get("UserID").toString();
System.out.println("userIDCheck: " + userIDCheck);
} catch (JSONException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return userIDCheck.equals("1");
}
})
.to(streamouttopic);
值:{"UserID":"1","Address":"XXX","AccountNo":"989","UserName":"Stella","AccountType":"YYY"}
我收到以下错误:
Exception in thread "SampleStreamProducer-4eecc3ab-858c-44a4-9b8c-5ece2b4ab21a-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=testtopic1, partition=0, offset=270
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:203)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:679)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:557)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: unknown because key is null / value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:91)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:47)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:189)
... 3 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to [B
at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:89)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:76)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87)
上面的流代码的值和条件都很好,我不明白为什么在执行 steam 代码时会出现此异常。
报告的问题应该只适用于 Kafka 2.0 及更早版本。从 2.1.0 版本开始,Kafka Streams 支持“serde push down”并且 to()
运算符应该从上游继承正确的 serdes(参见 https://issues.apache.org/jira/browse/KAFKA-7456)。
对于 Kafka 2.0 及更早版本,您必须明确地为 to()
操作指定正确的 Serdes。否则,它使用来自 StreamsConfig
的默认 Serdes,即 ByteArraySerde
(因为语义或 serde 覆盖是每个运算符的“直接覆盖”)——并且无法转换 String
至 byte[]
.
您需要做的事情:
.to(streamoutputtopic, Produced.with(Serdes.String(), Serdes.String()));
对于尚未使用 Produced
参数的更旧版本(1.0 之前),代码将是:
.to(Serdes.String(), Serdes.String(), streamoutputtopic);