使用 kafka-json-schema-console-producer 生成具有键模式和值模式的消息
Using kafka-json-schema-console-producer to produce message with a key schema and a value schema
我正在尝试使用 kafka-json-schema-console-producer 发布包含 both 一个键(带有模式)和一个值(与模式)。不幸的是,我找不到满足我要求的示例。
我可以按照文档发送简单的消息:
kafka-json-schema-console-producer \
--broker-list localhost:9092 \
--topic some-topic \
--property value.schema='
{
"definitions" : {
"record:myrecord" : {
"type" : "object",
"required" : [ "name", "calories" ],
"additionalProperties" : false,
"properties" : {
"name" : {"type" : "string"},
"calories" : {"type" : "number"},
"colour" : {"type" : "string"}
}
}
},
"$ref" : "#/definitions/record:myrecord"
}' < snacks.txt
两个问题:
- 如何添加密钥架构?是否像添加“key.schema”并使用与 value.schema 类似的语法一样简单?
- 使用键模式和值模式发送 JSON 消息的实际命令是什么样的?
是,添加--property key.schema
。对于磁盘上的 jsonschema 文件,也有 key.schema.file
选项,对于注册表中已有的 ID,也有 key.schema.id
选项。
-
提取(从 v6.2.0 开始)
* bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic t1 \
* --line-reader io.confluent.kafka.formatter.JsonSchemaMessageReader \
* --property schema.registry.url=http://localhost:8081 \
* --property parse.key=true \
* --property key.schema='{"type":"string"}' \
* --property value.schema='{"type":"object","properties":{"f1":{"type":"string"}}}'
* "type":"string"}]}'
kafka-json-schema-console-producer
是 shorthand 对于
kafka-console-producer --line-reader io.confluent.kafka.formatter.JsonSchemaMessageReader
感谢@OneCricketeer 的帮助,我才得以完成这项工作。下面是一个完整的示例,它将发送带有键和值的 JSON 架构的消息。 (顺便说一句:我展示了一个使用 Docker 的解决方案,但将其修改为不使用 Docker 应该很简单)
请注意,有几件事让我感到困惑:
- 您将需要“key.parse=true”属性(如果省略,则不会解析密钥,您将收到错误消息,或者在使用留言)
- 键定义看起来与值定义相同,但需要与值不同的 definition/record 名称(对于值:“definition/record:myrecord”和键:“definition/record:我的钥匙
docker exec -it schema-registry \
/usr/bin/kafka-json-schema-console-producer \
--broker-list http://kafka:9092 \
--topic source-1 \
--property key.separator="|" \
--property value.schema='
{
"definitions" : {
"record:myrecord" : {
"type" : "object",
"required" : [ "name", "calories" ],
"properties" : {
"name" : {"type" : "string"},
"calories" : {"type" : "number"},
"colour" : {"type" : "string"}
}
}
},
"$ref" : "#/definitions/record:myrecord"
}' \
--property parse.key=true \
--property key.schema='
{
"definitions" : {
"record:mykey" : {
"type" : "object",
"required" : [ "id" ],
"additionalProperties" : false,
"properties" : {
"id" : {"type" : "integer"}
}
}
},
"$ref" : "#/definitions/record:mykey"
和数据:
{"id":1} | {"timestamp":"foo", "data":"bar"}
我正在尝试使用 kafka-json-schema-console-producer 发布包含 both 一个键(带有模式)和一个值(与模式)。不幸的是,我找不到满足我要求的示例。
我可以按照文档发送简单的消息:
kafka-json-schema-console-producer \
--broker-list localhost:9092 \
--topic some-topic \
--property value.schema='
{
"definitions" : {
"record:myrecord" : {
"type" : "object",
"required" : [ "name", "calories" ],
"additionalProperties" : false,
"properties" : {
"name" : {"type" : "string"},
"calories" : {"type" : "number"},
"colour" : {"type" : "string"}
}
}
},
"$ref" : "#/definitions/record:myrecord"
}' < snacks.txt
两个问题:
- 如何添加密钥架构?是否像添加“key.schema”并使用与 value.schema 类似的语法一样简单?
- 使用键模式和值模式发送 JSON 消息的实际命令是什么样的?
是,添加
--property key.schema
。对于磁盘上的 jsonschema 文件,也有key.schema.file
选项,对于注册表中已有的 ID,也有key.schema.id
选项。
提取(从 v6.2.0 开始)
* bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic t1 \
* --line-reader io.confluent.kafka.formatter.JsonSchemaMessageReader \
* --property schema.registry.url=http://localhost:8081 \
* --property parse.key=true \
* --property key.schema='{"type":"string"}' \
* --property value.schema='{"type":"object","properties":{"f1":{"type":"string"}}}'
* "type":"string"}]}'
kafka-json-schema-console-producer
是 shorthand 对于
kafka-console-producer --line-reader io.confluent.kafka.formatter.JsonSchemaMessageReader
感谢@OneCricketeer 的帮助,我才得以完成这项工作。下面是一个完整的示例,它将发送带有键和值的 JSON 架构的消息。 (顺便说一句:我展示了一个使用 Docker 的解决方案,但将其修改为不使用 Docker 应该很简单)
请注意,有几件事让我感到困惑:
- 您将需要“key.parse=true”属性(如果省略,则不会解析密钥,您将收到错误消息,或者在使用留言)
- 键定义看起来与值定义相同,但需要与值不同的 definition/record 名称(对于值:“definition/record:myrecord”和键:“definition/record:我的钥匙
docker exec -it schema-registry \
/usr/bin/kafka-json-schema-console-producer \
--broker-list http://kafka:9092 \
--topic source-1 \
--property key.separator="|" \
--property value.schema='
{
"definitions" : {
"record:myrecord" : {
"type" : "object",
"required" : [ "name", "calories" ],
"properties" : {
"name" : {"type" : "string"},
"calories" : {"type" : "number"},
"colour" : {"type" : "string"}
}
}
},
"$ref" : "#/definitions/record:myrecord"
}' \
--property parse.key=true \
--property key.schema='
{
"definitions" : {
"record:mykey" : {
"type" : "object",
"required" : [ "id" ],
"additionalProperties" : false,
"properties" : {
"id" : {"type" : "integer"}
}
}
},
"$ref" : "#/definitions/record:mykey"
和数据:
{"id":1} | {"timestamp":"foo", "data":"bar"}