我可以使用 confluent Schema Registry 从平面文件生成无模式的 avro 消息吗?

Can I use confluent Schema Registry to generate schema less avro msgs from flat file?

我想知道是否可以使用 Confluent Schema 注册表来生成(然后将其发送到 kafka)无架构的 avro 记录?如果是,有人可以为此分享一些资源吗? 我无法在 Confluent 网站和 Google.

上找到任何示例

我有一个纯分隔文件,我有一个单独的模式,目前我正在使用 Avro 通用记录模式来序列化 Avro 记录并通过 Kafka 发送它。这样,架构仍然与记录相关联,这使得它更加庞大。我的逻辑是,如果我在从 kafka 发送记录时删除模式,我将能够获得更高的吞吐量。

Confluent Schema Registry 将发送序列化的 Avro 消息,消息中没有整个 Avro Schema。我认为这就是 "schema less" 消息的意思。

Confluent Schema Registry 将存储 Avro 模式,并且在线路上的消息中只包含一个短索引 ID。

包含用于测试 Confluent Schema Registry 的快速入门指南的完整文档在此处

http://docs.confluent.io/current/schema-registry/docs/index.html

您可以在 cmd

的以下命令的帮助下首次注册您的 avro 架构
curl -X POST -i -H "Content-Type: application/vnd.schemaregistry.v1+json" \
        --data '{"schema": "{\"type\": \"string\"}"}' \
        http://localhost:8081/subjects/topic

您可以使用

查看主题的所有版本
curl -X GET -i http://localhost:8081/subjects/topic/versions

要从汇合模式注册表中使用的所有版本中查看版本 1 的完整 Acro 模式,请使用以下命令,将以 json 格式显示模式

  curl -X GET -i http://localhost:8081/subjects/topica/versions/1

Avro 模式注册是 Kafka 生产者的任务

在 confluent schema registry 中有 schema 之后,你只需要将 avro 通用记录发布到特定的 kafka 主题,在我们的例子中是 'topic'

Kafka 消费者:使用下面的代码获取特定 Kafka 主题的最新模式

val schemaReg = new CachedSchemaRegistryClient(kafkaAvroSchemaRegistryUrl, 100)
val schemaMeta = schemaReg.getLatestSchemaMetadata(kafkaTopic + "-value")
val schema = schemaMeta.getSchema
val schema =new Schema.Parser().parse(schema)

以上将用于获取模式,然后我们可以使用 confluent 从 kafka 主题解码记录。