如何在kafka中定义多个序列化器?

How to define multiple serializers in kafka?

比如说,我发布和使用不同类型的 java objects.For,每个我都必须定义自己的序列化程序实现。 我们如何在"serializer.class"属性下的kafkaconsumer/producer属性文件中提供所有实现?

一个选项是Avro。 Avro 允许您定义记录类型,然后您可以轻松地对其进行序列化和反序列化。

这是根据文档改编的示例架构:

{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number", "default": null, "type": ["null","int"]},
     {"name": "favorite_color", "default": null, "type": ["null","string"]}
 ]
}

Avro 区分所谓的 SpecificDataGenericData。使用 SpecificData 个读取器和写入器,您可以轻松地序列化和反序列化已知的 Java 个对象。缺点是 SpecificData 需要 class 到模式转换的编译时知识。

另一方面,GenericData 读取器和写入器让您可以处理编译时不知道的记录类型。虽然显然非常强大,但这可能会变得有点笨拙——您将不得不花时间在粗糙的边缘进行编码。

还有其他选择——我想到了 Thrift——但据我了解,主要区别之一是 Avro 能够使用 GenericData

另一个好处是多语言兼容性。我知道 Avro 在很多平台上对很多语言都有原生支持。我敢肯定,其他选项也是如此——就多语言支持而言,任何现成的选项都可能比推出自己的选项更好,这只是程度的问题。

我们有一个类似的设置,在不同的主题中使用不同的对象,但在一个主题中始终使用相同的对象类型。我们使用ByteArrayDeserializer that comes with the Java API 0.9.0.1, which means or message consumers get only ever a byte[] as the value part of the message (we consistently use String for the keys). The first thing the topic-specific message consumer does is to call the right deserializer to convert the byte[]. You could use a apache commons helper class。够简单了。

如果您更愿意让 KafkaConsumer 为您进行反序列化,您当然可以编写自己的 Deserializer。您需要实现的 deserialize 方法将主题作为第一个参数。将它用作提供必要的反序列化器的映射的键,然后你就可以开始了。我的直觉是,在大多数情况下,无论如何你都会做一个正常的 Java 反序列化。

第二种方法的缺点是您需要一个通用的超级 class 来让您的所有消息对象能够正确地参数化 ConsumerRecord<K,V>。然而,对于第一种方法,它仍然是 ConsumerRecord<String, byte[]> 。但是随后您将 byte[] 转换为您需要的对象,就在正确的位置,并且只需要在那里进行一次转换。