DataException:由于序列化错误,将 byte[] 转换为 Kafka Connect 数据失败
DataException: Converting byte[] to Kafka Connect data failed due to serialization error
我正在使用 Kafka connect 将数据从 Kafka broker 保存到使用 Confluent Platform 的弹性搜索。
我写了一个 SinkConnector 来持久化数据到弹性搜索。
Connect-avro-standalone.properties配置为:
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
但是当我将有效的 json 从 avro 生产者推送到 Kafka 时,它会导致以下异常。
2016-10-05 15:02:25,225] ERROR Task kafka-elasticsearch-sink1-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:328)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:356)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: java.lang.ArrayIndexOutOfBoundsException: 6
Caused by: java.lang.ArrayIndexOutOfBoundsException: 6
at com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:138)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.loadMore(ReaderBasedJsonParser.java:153)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:1854)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:571)
我在类路径 2.5.4 和 2.6.3 中有两个不同版本的 jackson-databind jar。从类路径中删除了 2.5.4 版本的 jar 并解决了这个问题。
我正在使用 Kafka connect 将数据从 Kafka broker 保存到使用 Confluent Platform 的弹性搜索。
我写了一个 SinkConnector 来持久化数据到弹性搜索。
Connect-avro-standalone.properties配置为:
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
但是当我将有效的 json 从 avro 生产者推送到 Kafka 时,它会导致以下异常。
2016-10-05 15:02:25,225] ERROR Task kafka-elasticsearch-sink1-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:328)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:356)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: java.lang.ArrayIndexOutOfBoundsException: 6
Caused by: java.lang.ArrayIndexOutOfBoundsException: 6
at com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:138)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.loadMore(ReaderBasedJsonParser.java:153)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:1854)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:571)
我在类路径 2.5.4 和 2.6.3 中有两个不同版本的 jackson-databind jar。从类路径中删除了 2.5.4 版本的 jar 并解决了这个问题。