具有解码器问题的 Kafka Avro 消费者

Kafka Avro Consumer with Decoder issues

当我尝试 运行 Kafka Consumer with Avro over the data with my respective schema,it returns an error of "AvroRuntimeException: Malformed data. Length is negative: -40" . I see others have had similar issues converting byte array to json, Avro write and read, and Kafka Avro Binary *coder. I have also referenced this Consumer Group Example 时,这些都对我有帮助,但到目前为止对这个错误没有任何帮助。 它一直工作到这部分代码(第 73 行)

解码器decoder = DecoderFactory.get().binaryDecoder(byteArrayInputStream, null);

我尝试了其他解码器并打印出 byteArrayInputStream 变量的内容,它看起来像我相信你期望序列化 avro 数据的样子(在消息中我可以看到模式和一些数据以及一些格式错误的数据)我使用 .available() 方法打印出可用字节,returns 594。我无法理解为什么会发生此错误。 Apache Nifi 用于从 hdfs 生成具有相同模式的 Kafka 流。如果有任何帮助,我将不胜感激。

也许问题在于 Nifi 写入(编码)Avro 数据的方式与您的消费者应用程序读取(解码)数据的方式不匹配。

简而言之,Avro 的 API 提供了两种不同的序列化方法:

  1. 为了创建合适的 Avro 文件:对数据记录进行编码,并将 Avro 模式嵌入到一种序言中(通过 org.apache.avro.file.{DataFileWriter/DataFileReader})。将模式嵌入到 Avro 文件中很有意义,因为 (a) 通常 Avro 文件的 "payload" 比嵌入的 Avro 模式大几个数量级,并且 (b) 然后您可以在您的位置复制或移动这些文件心的内容,并且仍然确保您可以再次阅读它们而无需咨询某人或某事。
  2. 仅对数据记录进行编码,即不嵌入模式(通过 org.apache.avro.io.{BinaryEncoder/BinaryDecoder};注意包名的区别:这里的 io 与上面的 file)。例如,当对写入 Kafka 主题的消息进行 Avro 编码时,这种方法通常很受欢迎,因为与上面的变体 1 相比,您不会产生将 Avro 模式重新嵌入每条消息的开销,假设您(非常合理)政策是,对于同一个 Kafka 主题,消息 formatted/encoded 具有相同的 Avro 模式。这是一个显着优势,因为在流数据上下文中,动态数据数据记录通常比上述静态数据 Avro 文件(通常为数百或数千 MB);所以 Avro schema 的大小比较大,因此你不想在向 Kafka 写入 2000 条数据记录时将它嵌入 2000x。缺点是您必须 "somehow" 跟踪 Avro 模式如何映射到 Kafka 主题——或者更准确地说,您必须以某种方式跟踪消息是使用哪个 Avro 模式编码的,而不是沿着直接嵌入模式的路径。好消息是 tooling available in the Kafka ecosystem (Avro schema registry) 可以透明地执行此操作。因此,与变体 1 相比,变体 2 以牺牲便利性为代价提高了效率。

效果是 "wire format" 编码的 Avro 数据会根据您使用上面的 (1) 还是 (2) 而看起来不同。

我对 Apache Nifi 不是很熟悉,但快速查看源代码(例如 ConvertAvroToJSON.java)告诉我它使用的是变体 1,即它在 Avro 记录旁边嵌入了 Avro 模式.但是,您的消费者代码使用 DecoderFactory.get().binaryDecoder(),因此使用变体 2(未嵌入模式)。

也许这可以解释您 运行 所犯的错误?