KafkaAvroSerializer 用于在没有 schema.registry.url 的情况下序列化 Avro
KafkaAvroSerializer for serializing Avro without schema.registry.url
我是 Kafka 和 Avro 的菜鸟。所以我一直在努力得到Producer/Consumer运行。到目前为止,我已经能够使用以下方法生成和使用简单的字节和字符串:
生产者的配置:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
for (int i = 0; i < 1000; i++) {
GenericData.Record avroRecord = new GenericData.Record(schema);
avroRecord.put("str1", "Str 1-" + i);
avroRecord.put("str2", "Str 2-" + i);
avroRecord.put("int1", i);
byte[] bytes = recordInjection.apply(avroRecord);
ProducerRecord<String, byte[]> record = new ProducerRecord<>("mytopic", bytes);
producer.send(record);
Thread.sleep(250);
}
producer.close();
}
现在一切都很好,当我尝试序列化 POJO 时问题就来了。
因此,我能够使用 Avro 提供的实用程序从 POJO 获取 AvroSchema。
对模式进行硬编码,然后尝试创建一个通用记录以通过 KafkaProducer 发送
生产者现在设置为:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.KafkaAvroSerializer");
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA); // this is the Generated AvroSchema
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
这就是问题所在:当我使用 KafkaAvroSerializer 时,生产者没有出现是因为:
缺少强制参数:schema.registry.url
我仔细阅读了为什么需要这样做,以便我的消费者能够破译生产者发送给我的任何内容。
但是模式不是已经嵌入到 AvroMessage 中了吗?
如果有人可以分享一个使用 KafkaProducer 和 KafkaAvroSerializer 的工作示例而无需指定 schema.registry.url
,那就太好了
也非常感谢 insights/resources 架构注册表的实用程序。
谢谢!
首先注意:KafkaAvroSerializer
没有在 vanilla apache kafka 中提供——它是由 Confluent Platform 提供的。 (https://www.confluent.io/), as part of its open source components (http://docs.confluent.io/current/platform.html#confluent-schema-registry)
快速回答:不,如果您使用 KafkaAvroSerializer
,您将需要一个架构注册表。在此处查看一些示例:
http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html
模式注册表的基本思想是每个主题将引用一个avro模式(即,您只能发送彼此一致的数据。但是一个模式可以有多个版本,所以您仍然需要识别每条记录的模式)
我们不想像您暗示的那样为每个数据编写架构 - 通常,架构比您的数据大!每次看书都要解析,很浪费时间,也很浪费资源(网络,磁盘,cpu)
相反,模式注册表实例将执行绑定 avro schema <-> int schemaId
,然后序列化器将从注册表中获取数据(并将其缓存以备后用)后仅在数据前写入此 ID。
因此在 kafka 中,您的记录将是 [<id> <bytesavro>]
(出于技术原因,还有魔法字节),这仅是 5 个字节的开销(与您的模式的大小相比)
在阅读时,您的用户会找到与 id 对应的模式,以及与之相关的反序列化器 avro 字节。您可以在 confluent doc
中找到更多方法
如果你真的想为每条记录编写模式,你将需要一个其他的序列化程序(我想写你自己的,但这很容易,只需重用 https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerializer.java 并删除模式注册表部分将其替换为模式,与阅读相同)。但是如果你使用 avro,我真的不鼓励这样做 - 有一天,你将需要实现类似 avro registry 的东西来管理版本控制
虽然检查的答案都是正确的,但还应该提到 可以禁用模式注册。
只需将 auto.register.schemas
设置为 false
。
您始终可以使您的值 类 手动实现 Serialiser<T>
、Deserialiser<T>
(以及 Serde<T>
用于 Kafka Streams)。 Java 类 通常是从 Avro 文件生成的,因此直接编辑它不是一个好主意,但换行可能会很冗长,但也是可行的方法。
另一种方法是调整用于 Java 类 生成的 Arvo 生成器模板,并自动生成所有这些接口的实现。 Avro maven 和 gradle 插件都支持自定义模板,所以应该很容易配置。
我创建了 https://github.com/artemyarulin/avro-kafka-deserializable,它更改了模板文件和可用于文件生成的简单 CLI 工具
您可以创建自定义 Avro 序列化程序,这样即使没有 Schema 注册表,您也可以为主题生成记录。查看下面的文章。
https://codenotfound.com/spring-kafka-apache-avro-serializer-deserializer-example.html
这里他们使用了 Kafkatemplate 。我试过使用
KafkaProducer<String, User> UserKafkaProducer
工作正常
但是如果要使用KafkaAvroSerialiser,需要给Schema registryURL
正如其他人所指出的,KafkaAvroSerializer 需要 Schema Registry,它是 Confluent 平台的一部分,并且使用需要许可。
使用模式注册表的主要优点是您的在线字节会更小,而不是为每条消息编写带有模式的二进制负载。
我写了一篇blog post详细介绍了优点
我是 Kafka 和 Avro 的菜鸟。所以我一直在努力得到Producer/Consumer运行。到目前为止,我已经能够使用以下方法生成和使用简单的字节和字符串: 生产者的配置:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
for (int i = 0; i < 1000; i++) {
GenericData.Record avroRecord = new GenericData.Record(schema);
avroRecord.put("str1", "Str 1-" + i);
avroRecord.put("str2", "Str 2-" + i);
avroRecord.put("int1", i);
byte[] bytes = recordInjection.apply(avroRecord);
ProducerRecord<String, byte[]> record = new ProducerRecord<>("mytopic", bytes);
producer.send(record);
Thread.sleep(250);
}
producer.close();
}
现在一切都很好,当我尝试序列化 POJO 时问题就来了。 因此,我能够使用 Avro 提供的实用程序从 POJO 获取 AvroSchema。 对模式进行硬编码,然后尝试创建一个通用记录以通过 KafkaProducer 发送 生产者现在设置为:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.KafkaAvroSerializer");
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA); // this is the Generated AvroSchema
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
这就是问题所在:当我使用 KafkaAvroSerializer 时,生产者没有出现是因为: 缺少强制参数:schema.registry.url
我仔细阅读了为什么需要这样做,以便我的消费者能够破译生产者发送给我的任何内容。 但是模式不是已经嵌入到 AvroMessage 中了吗? 如果有人可以分享一个使用 KafkaProducer 和 KafkaAvroSerializer 的工作示例而无需指定 schema.registry.url
,那就太好了也非常感谢 insights/resources 架构注册表的实用程序。
谢谢!
首先注意:KafkaAvroSerializer
没有在 vanilla apache kafka 中提供——它是由 Confluent Platform 提供的。 (https://www.confluent.io/), as part of its open source components (http://docs.confluent.io/current/platform.html#confluent-schema-registry)
快速回答:不,如果您使用 KafkaAvroSerializer
,您将需要一个架构注册表。在此处查看一些示例:
http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html
模式注册表的基本思想是每个主题将引用一个avro模式(即,您只能发送彼此一致的数据。但是一个模式可以有多个版本,所以您仍然需要识别每条记录的模式)
我们不想像您暗示的那样为每个数据编写架构 - 通常,架构比您的数据大!每次看书都要解析,很浪费时间,也很浪费资源(网络,磁盘,cpu)
相反,模式注册表实例将执行绑定 avro schema <-> int schemaId
,然后序列化器将从注册表中获取数据(并将其缓存以备后用)后仅在数据前写入此 ID。
因此在 kafka 中,您的记录将是 [<id> <bytesavro>]
(出于技术原因,还有魔法字节),这仅是 5 个字节的开销(与您的模式的大小相比)
在阅读时,您的用户会找到与 id 对应的模式,以及与之相关的反序列化器 avro 字节。您可以在 confluent doc
如果你真的想为每条记录编写模式,你将需要一个其他的序列化程序(我想写你自己的,但这很容易,只需重用 https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerializer.java 并删除模式注册表部分将其替换为模式,与阅读相同)。但是如果你使用 avro,我真的不鼓励这样做 - 有一天,你将需要实现类似 avro registry 的东西来管理版本控制
虽然检查的答案都是正确的,但还应该提到 可以禁用模式注册。
只需将 auto.register.schemas
设置为 false
。
您始终可以使您的值 类 手动实现 Serialiser<T>
、Deserialiser<T>
(以及 Serde<T>
用于 Kafka Streams)。 Java 类 通常是从 Avro 文件生成的,因此直接编辑它不是一个好主意,但换行可能会很冗长,但也是可行的方法。
另一种方法是调整用于 Java 类 生成的 Arvo 生成器模板,并自动生成所有这些接口的实现。 Avro maven 和 gradle 插件都支持自定义模板,所以应该很容易配置。
我创建了 https://github.com/artemyarulin/avro-kafka-deserializable,它更改了模板文件和可用于文件生成的简单 CLI 工具
您可以创建自定义 Avro 序列化程序,这样即使没有 Schema 注册表,您也可以为主题生成记录。查看下面的文章。
https://codenotfound.com/spring-kafka-apache-avro-serializer-deserializer-example.html
这里他们使用了 Kafkatemplate 。我试过使用
KafkaProducer<String, User> UserKafkaProducer
工作正常 但是如果要使用KafkaAvroSerialiser,需要给Schema registryURL
正如其他人所指出的,KafkaAvroSerializer 需要 Schema Registry,它是 Confluent 平台的一部分,并且使用需要许可。
使用模式注册表的主要优点是您的在线字节会更小,而不是为每条消息编写带有模式的二进制负载。
我写了一篇blog post详细介绍了优点