Cloudflow 无法从 kafka 读取 avro 消息
Cloudflow is unable to read avro message from kafka
我正在使用 lightbend cloudflow 开发我的应用程序,该应用程序使用外部 kafka 主题。
外部 kafka 主题包含 avro 记录,如果我尝试将 kafka-avro-console-consumer 与 schema-regestry 结合使用,则能够获取消息。
但在同样的情况下,cloudflow 无法反序列化消息并抛出异常。
7:34:07.595 [consumer-akka.actor.default-dispatcher-4] ERROR cloudflow.streamlets.CodecInlet$ - Data decoding error, skipping message
com.twitter.bijection.InversionFailure: Failed to invert: [B@503e211a
at com.twitter.bijection.InversionFailure$$anonfun$partialFailure.applyOrElse(InversionFailure.scala:43)
at com.twitter.bijection.InversionFailure$$anonfun$partialFailure.applyOrElse(InversionFailure.scala:42)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
at scala.util.Failure.recoverWith(Try.scala:236)
at com.twitter.bijection.Inversion$.attempt(Inversion.scala:32)
at com.twitter.bijection.avro.BinaryAvroCodec.invert(AvroCodecs.scala:330)
at com.twitter.bijection.avro.BinaryAvroCodec.invert(AvroCodecs.scala:320)
at cloudflow.streamlets.avro.AvroSerde.$anonfun$inverted(AvroCodec.scala:39)
at cloudflow.streamlets.avro.AvroSerde.decode(AvroCodec.scala:44)
at cloudflow.streamlets.avro.AvroCodec.decode(AvroCodec.scala:34)
at cloudflow.akkastream.AkkaStreamletContextImpl.$anonfun$sourceWithContext(AkkaStreamletContextImpl.scala:141)
at akka.stream.scaladsl.FlowWithContextOps.$anonfun$map(FlowWithContextOps.scala:70)
at akka.stream.impl.fusing.Map$$anon.onPush(Ops.scala:53)
at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:541)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:423)
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:625)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:502)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:600)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:769)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive.applyOrElse(ActorGraphInterpreter.scala:784)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:691)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)
at akka.actor.ActorCell.invoke(ActorCell.scala:547)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -25
at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at com.twitter.bijection.avro.BinaryAvroCodec.$anonfun$invert(AvroCodecs.scala:332)
at com.twitter.bijection.Inversion$.$anonfun$attempt(Inversion.scala:32)
at scala.util.Try$.apply(Try.scala:213)
... 28 common frames omitted
17:34:07.595 [consumer-akka.actor.default-dispatcher-4] ERROR cloudflow.streamlets.CodecInlet$ - Data decoding error, skipping message
com.twitter.bijection.InversionFailure: Failed to invert: [B@5c9c541f
at com.twitter.bijection.InversionFailure$$anonfun$partialFailure.applyOrElse(InversionFailure.scala:43)
at com.twitter.bijection.InversionFailure$$anonfun$partialFailure.applyOrElse(InversionFailure.scala:42)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
at scala.util.Failure.recoverWith(Try.scala:236)
at com.twitter.bijection.Inversion$.attempt(Inversion.scala:32)
at com.twitter.bijection.avro.BinaryAvroCodec.invert(AvroCodecs.scala:330)
at com.twitter.bijection.avro.BinaryAvroCodec.invert(AvroCodecs.scala:320)
at cloudflow.streamlets.avro.AvroSerde.$anonfun$inverted(AvroCodec.scala:39)
at cloudflow.streamlets.avro.AvroSerde.decode(AvroCodec.scala:44)
at cloudflow.streamlets.avro.AvroCodec.decode(AvroCodec.scala:34)
at cloudflow.akkastream.AkkaStreamletContextImpl.$anonfun$sourceWithContext(AkkaStreamletContextImpl.scala:141)
at akka.stream.scaladsl.FlowWithContextOps.$anonfun$map(FlowWithContextOps.scala:70)
at akka.stream.impl.fusing.Map$$anon.onPush(Ops.scala:53)
at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:541)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:423)
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:625)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:502)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:600)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:769)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive.applyOrElse(ActorGraphInterpreter.scala:784)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:691)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)
at akka.actor.ActorCell.invoke(ActorCell.scala:547)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -25
at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at com.twitter.bijection.avro.BinaryAvroCodec.$anonfun$invert(AvroCodecs.scala:332)
at com.twitter.bijection.Inversion$.$anonfun$attempt(Inversion.scala:32)
at scala.util.Try$.apply(Try.scala:213)
... 28 common frames omitted
17:34:07.596 [consumer-akka.actor.default-dispatcher-4] ERROR cloudflow.streamlets.CodecInlet$ - Data decoding error, skipping message
com.twitter.bijection.InversionFailure: Failed to invert: [B@522b512f
at com.twitter.bijection.InversionFailure$$anonfun$partialFailure.applyOrElse(InversionFailure.scala:43)
at com.twitter.bijection.InversionFailure$$anonfun$partialFailure.applyOrElse(InversionFailure.scala:42)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
at scala.util.Failure.recoverWith(Try.scala:236)
at com.twitter.bijection.Inversion$.attempt(Inversion.scala:32)
at com.twitter.bijection.avro.BinaryAvroCodec.invert(AvroCodecs.scala:330)
at com.twitter.bijection.avro.BinaryAvroCodec.invert(AvroCodecs.scala:320)
at cloudflow.streamlets.avro.AvroSerde.$anonfun$inverted(AvroCodec.scala:39)
at cloudflow.streamlets.avro.AvroSerde.decode(AvroCodec.scala:44)
at cloudflow.streamlets.avro.AvroCodec.decode(AvroCodec.scala:34)
at cloudflow.akkastream.AkkaStreamletContextImpl.$anonfun$sourceWithContext(AkkaStreamletContextImpl.scala:141)
at akka.stream.scaladsl.FlowWithContextOps.$anonfun$map(FlowWithContextOps.scala:70)
at akka.stream.impl.fusing.Map$$anon.onPush(Ops.scala:53)
at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:541)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:423)
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:625)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:502)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:600)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:769)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive.applyOrElse(ActorGraphInterpreter.scala:784)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:691)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)
at akka.actor.ActorCell.invoke(ActorCell.scala:547)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -25
at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at com.twitter.bijection.avro.BinaryAvroCodec.$anonfun$invert(AvroCodecs.scala:332)
at com.twitter.bijection.Inversion$.$anonfun$attempt(Inversion.scala:32)
at scala.util.Try$.apply(Try.scala:213)
... 28 common frames omitted
com.twitter.bijection.avro.BinaryAvroCodec
不适用于 Confluent Schema Registry 格式。
您需要调整 Kafka 客户端的反序列化器设置以使用来自 Confluent
的适当 KafkaAvroDeserializer
class
我正在使用 lightbend cloudflow 开发我的应用程序,该应用程序使用外部 kafka 主题。
外部 kafka 主题包含 avro 记录,如果我尝试将 kafka-avro-console-consumer 与 schema-regestry 结合使用,则能够获取消息。
但在同样的情况下,cloudflow 无法反序列化消息并抛出异常。
7:34:07.595 [consumer-akka.actor.default-dispatcher-4] ERROR cloudflow.streamlets.CodecInlet$ - Data decoding error, skipping message
com.twitter.bijection.InversionFailure: Failed to invert: [B@503e211a
at com.twitter.bijection.InversionFailure$$anonfun$partialFailure.applyOrElse(InversionFailure.scala:43)
at com.twitter.bijection.InversionFailure$$anonfun$partialFailure.applyOrElse(InversionFailure.scala:42)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
at scala.util.Failure.recoverWith(Try.scala:236)
at com.twitter.bijection.Inversion$.attempt(Inversion.scala:32)
at com.twitter.bijection.avro.BinaryAvroCodec.invert(AvroCodecs.scala:330)
at com.twitter.bijection.avro.BinaryAvroCodec.invert(AvroCodecs.scala:320)
at cloudflow.streamlets.avro.AvroSerde.$anonfun$inverted(AvroCodec.scala:39)
at cloudflow.streamlets.avro.AvroSerde.decode(AvroCodec.scala:44)
at cloudflow.streamlets.avro.AvroCodec.decode(AvroCodec.scala:34)
at cloudflow.akkastream.AkkaStreamletContextImpl.$anonfun$sourceWithContext(AkkaStreamletContextImpl.scala:141)
at akka.stream.scaladsl.FlowWithContextOps.$anonfun$map(FlowWithContextOps.scala:70)
at akka.stream.impl.fusing.Map$$anon.onPush(Ops.scala:53)
at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:541)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:423)
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:625)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:502)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:600)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:769)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive.applyOrElse(ActorGraphInterpreter.scala:784)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:691)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)
at akka.actor.ActorCell.invoke(ActorCell.scala:547)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -25
at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at com.twitter.bijection.avro.BinaryAvroCodec.$anonfun$invert(AvroCodecs.scala:332)
at com.twitter.bijection.Inversion$.$anonfun$attempt(Inversion.scala:32)
at scala.util.Try$.apply(Try.scala:213)
... 28 common frames omitted
17:34:07.595 [consumer-akka.actor.default-dispatcher-4] ERROR cloudflow.streamlets.CodecInlet$ - Data decoding error, skipping message
com.twitter.bijection.InversionFailure: Failed to invert: [B@5c9c541f
at com.twitter.bijection.InversionFailure$$anonfun$partialFailure.applyOrElse(InversionFailure.scala:43)
at com.twitter.bijection.InversionFailure$$anonfun$partialFailure.applyOrElse(InversionFailure.scala:42)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
at scala.util.Failure.recoverWith(Try.scala:236)
at com.twitter.bijection.Inversion$.attempt(Inversion.scala:32)
at com.twitter.bijection.avro.BinaryAvroCodec.invert(AvroCodecs.scala:330)
at com.twitter.bijection.avro.BinaryAvroCodec.invert(AvroCodecs.scala:320)
at cloudflow.streamlets.avro.AvroSerde.$anonfun$inverted(AvroCodec.scala:39)
at cloudflow.streamlets.avro.AvroSerde.decode(AvroCodec.scala:44)
at cloudflow.streamlets.avro.AvroCodec.decode(AvroCodec.scala:34)
at cloudflow.akkastream.AkkaStreamletContextImpl.$anonfun$sourceWithContext(AkkaStreamletContextImpl.scala:141)
at akka.stream.scaladsl.FlowWithContextOps.$anonfun$map(FlowWithContextOps.scala:70)
at akka.stream.impl.fusing.Map$$anon.onPush(Ops.scala:53)
at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:541)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:423)
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:625)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:502)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:600)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:769)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive.applyOrElse(ActorGraphInterpreter.scala:784)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:691)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)
at akka.actor.ActorCell.invoke(ActorCell.scala:547)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -25
at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at com.twitter.bijection.avro.BinaryAvroCodec.$anonfun$invert(AvroCodecs.scala:332)
at com.twitter.bijection.Inversion$.$anonfun$attempt(Inversion.scala:32)
at scala.util.Try$.apply(Try.scala:213)
... 28 common frames omitted
17:34:07.596 [consumer-akka.actor.default-dispatcher-4] ERROR cloudflow.streamlets.CodecInlet$ - Data decoding error, skipping message
com.twitter.bijection.InversionFailure: Failed to invert: [B@522b512f
at com.twitter.bijection.InversionFailure$$anonfun$partialFailure.applyOrElse(InversionFailure.scala:43)
at com.twitter.bijection.InversionFailure$$anonfun$partialFailure.applyOrElse(InversionFailure.scala:42)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
at scala.util.Failure.recoverWith(Try.scala:236)
at com.twitter.bijection.Inversion$.attempt(Inversion.scala:32)
at com.twitter.bijection.avro.BinaryAvroCodec.invert(AvroCodecs.scala:330)
at com.twitter.bijection.avro.BinaryAvroCodec.invert(AvroCodecs.scala:320)
at cloudflow.streamlets.avro.AvroSerde.$anonfun$inverted(AvroCodec.scala:39)
at cloudflow.streamlets.avro.AvroSerde.decode(AvroCodec.scala:44)
at cloudflow.streamlets.avro.AvroCodec.decode(AvroCodec.scala:34)
at cloudflow.akkastream.AkkaStreamletContextImpl.$anonfun$sourceWithContext(AkkaStreamletContextImpl.scala:141)
at akka.stream.scaladsl.FlowWithContextOps.$anonfun$map(FlowWithContextOps.scala:70)
at akka.stream.impl.fusing.Map$$anon.onPush(Ops.scala:53)
at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:541)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:423)
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:625)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:502)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:600)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:769)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive.applyOrElse(ActorGraphInterpreter.scala:784)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:691)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)
at akka.actor.ActorCell.invoke(ActorCell.scala:547)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -25
at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at com.twitter.bijection.avro.BinaryAvroCodec.$anonfun$invert(AvroCodecs.scala:332)
at com.twitter.bijection.Inversion$.$anonfun$attempt(Inversion.scala:32)
at scala.util.Try$.apply(Try.scala:213)
... 28 common frames omitted
com.twitter.bijection.avro.BinaryAvroCodec
不适用于 Confluent Schema Registry 格式。
您需要调整 Kafka 客户端的反序列化器设置以使用来自 Confluent
的适当KafkaAvroDeserializer
class