使用 kafka-json-schema-console-consumer 仅使用 returns 值而不是键从 KAFKA 使用 key/value

Consuming a key/value from KAFKA using kafka-json-schema-console-consumer only returns value but not key

我正在构建一个 Kafka 源连接器,并且能够成功发布一个带有键及其架构的 Kafka SourceRecord,以及带有值和架构的架构。但是,当我使用 kafka-json-schema-console-consumer 来使用主题上的消息时,我希望同时收到键和值,但我只收到值而未收到键。感谢您的帮助!

下面是我手头的相关代码 tried/used:

我发布的消息:

   "message": {
        "key": {
            "id": "some-id"
        },
        "data": {
            "text": "some-text"
        }
    }

消息按如下方式放入 SourceRecord 中:

:
:
SourceRecord sr = new SourceRecord(
            sourcePartition, sourceOffset, this.topicName, partition,
            keySchema, key, valueSchema, value);
log.info("HttpSourceTask:buildSourceRecord - source:{}", sr);

我使用日志消息来确认消息已成功处理...下面是 Kafka 中的控制台日志消息,它确认在 SourceRecord 中正确设置了键加键模式和值加值模式:

connect            | [2022-01-29 21:24:47,455] INFO HttpSourceTask:buildSourceRecord - source:SourceRecord{sourcePartition={source=bgs-topic}, sourceOffset={offset=0}} ConnectRecord{topic='bgs-topic', kafkaPartition=null, key=Struct{id=some-id}, keySchema=Schema{STRUCT}, value=Struct{text=some-text}, valueSchema=Schema{STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}

请注意,key 和 keySchema 字段是用 STRUCT 填充的(value 和 valueSchema 字段也是如此)。

我正在使用以下代码(我在 docker 中使用 Kafka)来使用消息...这是似乎 return 不正确数据的部分(关键是失踪)。

docker exec -it schema-registry \
/usr/bin/kafka-json-schema-console-consumer \
--bootstrap-server http://kafka:9092 \
--topic bgs-topic \
--from-beginning \
--property key.separator="|" \
--property value.schema='
{
  "title": "simple-data-schema",
  "type" : "object",
  "required" : [ "text" ],
  "properties" : {
    "text" : {
      "type": "string"
    }
  }
}' \
--property parse.key=true \
--property key.schema='
{
  "title": "simple-key-schema",
  "type" : "object",
  "required" : [ "id" ],
  "properties" : {
    "id" : {
      "type": "string"
    }
  }
}'

这是 returned 消息 - 请注意唯一可用的值:

{"text":"some-text"}

发现错误...我使用了“parse.key=true”,但它应该是“print.key=true”...一旦做出此更改,就会提供正确的输出:

{"id":"some-id"}|{"text":"some-text"}

修改后的命令如下:

docker 执行 -it schema-registry
/usr/bin/kafka-json-schema-console-consumer
--bootstrap-server http://kafka:9092
--topic bgs-topic
--from-beginning
--属性 key.separator="|"
--属性value.schema=' { "title": "simple-data-schema", “类型”:“对象”, “必填”:[“文本”], “特性” : { “文本” : { “类型”:“字符串” } } }'
--属性 print.key=真
--属性key.schema=' { "title": "simple-key-schema", “类型”:“对象”, “必填”:[“id”], “特性” : { “ID” : { “类型”:“字符串” } } }'