Apache Nifi ConsumeKafkaRecord_2_6 消费来自主题的消息,其中键和值使用融合模式注册表进行 avro 序列化

Apache Nifi ConsumeKafkaRecord_2_6 consuming message from topic where key and value are avro serialized using confluent schema registry

我正在使用 nifi 通过以下设置构建数据流:

我还使用处理器 ConsumeKafkaRecord_2_6 来处理来自主题的消息,其中键和值都使用 avro 序列化 - 键和值的模式存储在融合模式注册表中。但是处理器无法解析消息,因为没有一种方法 - 我可以看到 - 指定键和值都使用存储在汇合模式注册表中的模式进行 avro 序列化。命名模式的约定通常是[主题名称]-值和[主题名称]-键。我可以使用 kcat(以前是 kafkacat)很好地阅读消息:

kcat -b broker1:9092,broker2:9092,broker3:9092 -t mytopic -s avro -r http://schema-registry_url.com -p 0

有没有办法读取此类消息,或者我应该将自己的处理器添加到 nifi 中吗?这是错误的踪迹:

   causes: org.apache.nifi.serialization.MalformedRecordException: Error while getting next record. Root cause: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: negative 62
     org.apache.nifi.serialization.MalformedRecordException: Error while getting next record. Root cause: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: negative 62
        at org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:52)
        at org.apache.nifi.serialization.RecordReader.nextRecord(RecordReader.java:50)
        at sun.reflect.GeneratedMethodAccessor559.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:254)
        at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.access0(StandardControllerServiceInvocationHandler.java:38)
        at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler$ProxiedReturnObjectInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:240)
        at com.sun.proxy.$Proxy192.nextRecord(Unknown Source)
        at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:549)
        at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.lambda$processRecords(ConsumerLease.java:342)
        at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
        at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647)
        at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.processRecords(ConsumerLease.java:329)
        at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.poll(ConsumerLease.java:188)
        at org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6.onTrigger(ConsumeKafkaRecord_2_6.java:472)
        at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
        at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1202)
        at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:214)
        at org.apache.nifi.controller.scheduling.QuartzSchedulingAgent.run(QuartzSchedulingAgent.java:137)
        at org.apache.nifi.engine.FlowEngine.run(FlowEngine.java:110)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
     Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -62
        at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
        at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
        at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
        at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:430)
        at org.apache.nifi.avro.NonCachingDatumReader.readString(NonCachingDatumReader.java:51)
        at org.apache.avro.generic.GenericDatumReader.readMapKey(GenericDatumReader.java:335)
        at org.apache.avro.generic.GenericDatumReader.readMap(GenericDatumReader.java:321)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
        at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240)
        at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
        at org.apache.nifi.avro.AvroReaderWithExplicitSchema.nextAvroRecord(AvroReaderWithExplicitSchema.java:92)
        at org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:39)
        ... 27 common frames omitted

我附上处理器的图片

如果数据已经被某些 Confluent Serializer 正确序列化,您应该更喜欢使用 AvroReader 中的“Confluent Content-Encoded Schema Reference”选项由于模式 ID 嵌入在记录中,因此将获得正确的 subject/version。

否则,使用“Schema Name”或“Schema Text”值将对注册表执行查找或使用文字,但是,反序列化程序仍需要一定的 content-length 记录字节,似乎是问题的原因 Malformed data. Length is negative ...