具有解码器问题的 Kafka Avro 消费者
Kafka Avro Consumer with Decoder issues
当我尝试 运行 Kafka Consumer with Avro over the data with my respective schema,it returns an error of "AvroRuntimeException: Malformed data. Length is negative: -40" . I see others have had similar issues converting byte array to json, Avro write and read, and Kafka Avro Binary *coder. I have also referenced this Consumer Group Example 时,这些都对我有帮助,但到目前为止对这个错误没有任何帮助。
它一直工作到这部分代码(第 73 行)
解码器decoder = DecoderFactory.get().binaryDecoder(byteArrayInputStream, null);
我尝试了其他解码器并打印出 byteArrayInputStream 变量的内容,它看起来像我相信你期望序列化 avro 数据的样子(在消息中我可以看到模式和一些数据以及一些格式错误的数据)我使用 .available() 方法打印出可用字节,returns 594。我无法理解为什么会发生此错误。 Apache Nifi 用于从 hdfs 生成具有相同模式的 Kafka 流。如果有任何帮助,我将不胜感激。
也许问题在于 Nifi 写入(编码)Avro 数据的方式与您的消费者应用程序读取(解码)数据的方式不匹配。
简而言之,Avro 的 API 提供了两种不同的序列化方法:
- 为了创建合适的 Avro 文件:对数据记录进行编码,并将 Avro 模式嵌入到一种序言中(通过
org.apache.avro.file.{DataFileWriter/DataFileReader}
)。将模式嵌入到 Avro 文件中很有意义,因为 (a) 通常 Avro 文件的 "payload" 比嵌入的 Avro 模式大几个数量级,并且 (b) 然后您可以在您的位置复制或移动这些文件心的内容,并且仍然确保您可以再次阅读它们而无需咨询某人或某事。
- 仅对数据记录进行编码,即不嵌入模式(通过
org.apache.avro.io.{BinaryEncoder/BinaryDecoder}
;注意包名的区别:这里的 io
与上面的 file
)。例如,当对写入 Kafka 主题的消息进行 Avro 编码时,这种方法通常很受欢迎,因为与上面的变体 1 相比,您不会产生将 Avro 模式重新嵌入每条消息的开销,假设您(非常合理)政策是,对于同一个 Kafka 主题,消息 formatted/encoded 具有相同的 Avro 模式。这是一个显着优势,因为在流数据上下文中,动态数据数据记录通常比上述静态数据 Avro 文件(通常为数百或数千 MB);所以 Avro schema 的大小比较大,因此你不想在向 Kafka 写入 2000 条数据记录时将它嵌入 2000x。缺点是您必须 "somehow" 跟踪 Avro 模式如何映射到 Kafka 主题——或者更准确地说,您必须以某种方式跟踪消息是使用哪个 Avro 模式编码的,而不是沿着直接嵌入模式的路径。好消息是 tooling available in the Kafka ecosystem (Avro schema registry) 可以透明地执行此操作。因此,与变体 1 相比,变体 2 以牺牲便利性为代价提高了效率。
效果是 "wire format" 编码的 Avro 数据会根据您使用上面的 (1) 还是 (2) 而看起来不同。
我对 Apache Nifi 不是很熟悉,但快速查看源代码(例如 ConvertAvroToJSON.java)告诉我它使用的是变体 1,即它在 Avro 记录旁边嵌入了 Avro 模式.但是,您的消费者代码使用 DecoderFactory.get().binaryDecoder()
,因此使用变体 2(未嵌入模式)。
也许这可以解释您 运行 所犯的错误?
当我尝试 运行 Kafka Consumer with Avro over the data with my respective schema,it returns an error of "AvroRuntimeException: Malformed data. Length is negative: -40" . I see others have had similar issues converting byte array to json, Avro write and read, and Kafka Avro Binary *coder. I have also referenced this Consumer Group Example 时,这些都对我有帮助,但到目前为止对这个错误没有任何帮助。 它一直工作到这部分代码(第 73 行)
解码器decoder = DecoderFactory.get().binaryDecoder(byteArrayInputStream, null);
我尝试了其他解码器并打印出 byteArrayInputStream 变量的内容,它看起来像我相信你期望序列化 avro 数据的样子(在消息中我可以看到模式和一些数据以及一些格式错误的数据)我使用 .available() 方法打印出可用字节,returns 594。我无法理解为什么会发生此错误。 Apache Nifi 用于从 hdfs 生成具有相同模式的 Kafka 流。如果有任何帮助,我将不胜感激。
也许问题在于 Nifi 写入(编码)Avro 数据的方式与您的消费者应用程序读取(解码)数据的方式不匹配。
简而言之,Avro 的 API 提供了两种不同的序列化方法:
- 为了创建合适的 Avro 文件:对数据记录进行编码,并将 Avro 模式嵌入到一种序言中(通过
org.apache.avro.file.{DataFileWriter/DataFileReader}
)。将模式嵌入到 Avro 文件中很有意义,因为 (a) 通常 Avro 文件的 "payload" 比嵌入的 Avro 模式大几个数量级,并且 (b) 然后您可以在您的位置复制或移动这些文件心的内容,并且仍然确保您可以再次阅读它们而无需咨询某人或某事。 - 仅对数据记录进行编码,即不嵌入模式(通过
org.apache.avro.io.{BinaryEncoder/BinaryDecoder}
;注意包名的区别:这里的io
与上面的file
)。例如,当对写入 Kafka 主题的消息进行 Avro 编码时,这种方法通常很受欢迎,因为与上面的变体 1 相比,您不会产生将 Avro 模式重新嵌入每条消息的开销,假设您(非常合理)政策是,对于同一个 Kafka 主题,消息 formatted/encoded 具有相同的 Avro 模式。这是一个显着优势,因为在流数据上下文中,动态数据数据记录通常比上述静态数据 Avro 文件(通常为数百或数千 MB);所以 Avro schema 的大小比较大,因此你不想在向 Kafka 写入 2000 条数据记录时将它嵌入 2000x。缺点是您必须 "somehow" 跟踪 Avro 模式如何映射到 Kafka 主题——或者更准确地说,您必须以某种方式跟踪消息是使用哪个 Avro 模式编码的,而不是沿着直接嵌入模式的路径。好消息是 tooling available in the Kafka ecosystem (Avro schema registry) 可以透明地执行此操作。因此,与变体 1 相比,变体 2 以牺牲便利性为代价提高了效率。
效果是 "wire format" 编码的 Avro 数据会根据您使用上面的 (1) 还是 (2) 而看起来不同。
我对 Apache Nifi 不是很熟悉,但快速查看源代码(例如 ConvertAvroToJSON.java)告诉我它使用的是变体 1,即它在 Avro 记录旁边嵌入了 Avro 模式.但是,您的消费者代码使用 DecoderFactory.get().binaryDecoder()
,因此使用变体 2(未嵌入模式)。
也许这可以解释您 运行 所犯的错误?