使用 kafka-json-schema-console-producer 生成具有键模式和值模式的消息

Using kafka-json-schema-console-producer to produce message with a key schema and a value schema

我正在尝试使用 kafka-json-schema-console-producer 发布包含 both 一个键(带有模式)和一个值(与模式)。不幸的是,我找不到满足我要求的示例。

我可以按照文档发送简单的消息:

kafka-json-schema-console-producer \
  --broker-list localhost:9092 \
  --topic some-topic \
  --property value.schema='
{
  "definitions" : {
    "record:myrecord" : {
      "type" : "object",
      "required" : [ "name", "calories" ],
      "additionalProperties" : false,
      "properties" : {
        "name" : {"type" : "string"},
        "calories" : {"type" : "number"},
        "colour" : {"type" : "string"}
      }
    }
  },
  "$ref" : "#/definitions/record:myrecord"
}' < snacks.txt

两个问题:

  1. 如何添加密钥架构?是否像添加“key.schema”并使用与 value.schema 类似的语法一样简单?
  2. 使用键模式和值模式发送 JSON 消息的实际命令是什么样的?
  1. 是,添加--property key.schema。对于磁盘上的 jsonschema 文件,也有 key.schema.file 选项,对于注册表中已有的 ID,也有 key.schema.id 选项。

  2. 参考源代码示例 - https://github.com/confluentinc/schema-registry/blob/master/json-schema-serializer/src/main/java/io/confluent/kafka/formatter/json/JsonSchemaMessageReader.java#L72

提取(从 v6.2.0 开始)

 * bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic t1 \
 * --line-reader io.confluent.kafka.formatter.JsonSchemaMessageReader \
 * --property schema.registry.url=http://localhost:8081 \
 * --property parse.key=true \
 * --property key.schema='{"type":"string"}' \
 * --property value.schema='{"type":"object","properties":{"f1":{"type":"string"}}}'
 * "type":"string"}]}'

kafka-json-schema-console-producer 是 shorthand 对于

kafka-console-producer --line-reader io.confluent.kafka.formatter.JsonSchemaMessageReader

感谢@OneCricketeer 的帮助,我才得以完成这项工作。下面是一个完整的示例,它将发送带有键和值的 JSON 架构的消息。 (顺便说一句:我展示了一个使用 Docker 的解决方案,但将其修改为不使用 Docker 应该很简单)

请注意,有几件事让我感到困惑:

  • 您将需要“key.parse=true”属性(如果省略,则不会解析密钥,您将收到错误消息,或者在使用留言)
  • 键定义看起来与值定义相同,但需要与值不同的 definition/record 名称(对于值:“definition/record:myrecord”和键:“definition/record:我的钥匙
docker exec -it schema-registry \
/usr/bin/kafka-json-schema-console-producer \
--broker-list http://kafka:9092 \
--topic source-1 \
--property key.separator="|" \
--property value.schema='
{
  "definitions" : {
    "record:myrecord" : {
      "type" : "object",
      "required" : [ "name", "calories" ],
      "properties" : {
        "name" : {"type" : "string"},
        "calories" : {"type" : "number"},
        "colour" : {"type" : "string"}
      }
    }
  },
  "$ref" : "#/definitions/record:myrecord"
}' \
--property parse.key=true \
--property key.schema='
{
  "definitions" : {
    "record:mykey" : {
      "type" : "object",
      "required" : [ "id" ],
      "additionalProperties" : false,
      "properties" : {
        "id" : {"type" : "integer"}
      }
    }
  },
  "$ref" : "#/definitions/record:mykey"

和数据:

{"id":1} | {"timestamp":"foo", "data":"bar"}