使用 kafka-json-schema-console-consumer 仅使用 returns 值而不是键从 KAFKA 使用 key/value
Consuming a key/value from KAFKA using kafka-json-schema-console-consumer only returns value but not key
我正在构建一个 Kafka 源连接器,并且能够成功发布一个带有键及其架构的 Kafka SourceRecord,以及带有值和架构的架构。但是,当我使用 kafka-json-schema-console-consumer 来使用主题上的消息时,我希望同时收到键和值,但我只收到值而未收到键。感谢您的帮助!
下面是我手头的相关代码 tried/used:
我发布的消息:
"message": {
"key": {
"id": "some-id"
},
"data": {
"text": "some-text"
}
}
消息按如下方式放入 SourceRecord 中:
:
:
SourceRecord sr = new SourceRecord(
sourcePartition, sourceOffset, this.topicName, partition,
keySchema, key, valueSchema, value);
log.info("HttpSourceTask:buildSourceRecord - source:{}", sr);
我使用日志消息来确认消息已成功处理...下面是 Kafka 中的控制台日志消息,它确认在 SourceRecord 中正确设置了键加键模式和值加值模式:
connect | [2022-01-29 21:24:47,455] INFO HttpSourceTask:buildSourceRecord - source:SourceRecord{sourcePartition={source=bgs-topic}, sourceOffset={offset=0}} ConnectRecord{topic='bgs-topic', kafkaPartition=null, key=Struct{id=some-id}, keySchema=Schema{STRUCT}, value=Struct{text=some-text}, valueSchema=Schema{STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
请注意,key 和 keySchema 字段是用 STRUCT 填充的(value 和 valueSchema 字段也是如此)。
我正在使用以下代码(我在 docker 中使用 Kafka)来使用消息...这是似乎 return 不正确数据的部分(关键是失踪)。
docker exec -it schema-registry \
/usr/bin/kafka-json-schema-console-consumer \
--bootstrap-server http://kafka:9092 \
--topic bgs-topic \
--from-beginning \
--property key.separator="|" \
--property value.schema='
{
"title": "simple-data-schema",
"type" : "object",
"required" : [ "text" ],
"properties" : {
"text" : {
"type": "string"
}
}
}' \
--property parse.key=true \
--property key.schema='
{
"title": "simple-key-schema",
"type" : "object",
"required" : [ "id" ],
"properties" : {
"id" : {
"type": "string"
}
}
}'
这是 returned 消息 - 请注意唯一可用的值:
{"text":"some-text"}
发现错误...我使用了“parse.key=true”,但它应该是“print.key=true”...一旦做出此更改,就会提供正确的输出:
{"id":"some-id"}|{"text":"some-text"}
修改后的命令如下:
docker 执行 -it schema-registry
/usr/bin/kafka-json-schema-console-consumer
--bootstrap-server http://kafka:9092
--topic bgs-topic
--from-beginning
--属性 key.separator="|"
--属性value.schema='
{
"title": "simple-data-schema",
“类型”:“对象”,
“必填”:[“文本”],
“特性” : {
“文本” : {
“类型”:“字符串”
}
}
}'
--属性 print.key=真
--属性key.schema='
{
"title": "simple-key-schema",
“类型”:“对象”,
“必填”:[“id”],
“特性” : {
“ID” : {
“类型”:“字符串”
}
}
}'
我正在构建一个 Kafka 源连接器,并且能够成功发布一个带有键及其架构的 Kafka SourceRecord,以及带有值和架构的架构。但是,当我使用 kafka-json-schema-console-consumer 来使用主题上的消息时,我希望同时收到键和值,但我只收到值而未收到键。感谢您的帮助!
下面是我手头的相关代码 tried/used:
我发布的消息:
"message": {
"key": {
"id": "some-id"
},
"data": {
"text": "some-text"
}
}
消息按如下方式放入 SourceRecord 中:
:
:
SourceRecord sr = new SourceRecord(
sourcePartition, sourceOffset, this.topicName, partition,
keySchema, key, valueSchema, value);
log.info("HttpSourceTask:buildSourceRecord - source:{}", sr);
我使用日志消息来确认消息已成功处理...下面是 Kafka 中的控制台日志消息,它确认在 SourceRecord 中正确设置了键加键模式和值加值模式:
connect | [2022-01-29 21:24:47,455] INFO HttpSourceTask:buildSourceRecord - source:SourceRecord{sourcePartition={source=bgs-topic}, sourceOffset={offset=0}} ConnectRecord{topic='bgs-topic', kafkaPartition=null, key=Struct{id=some-id}, keySchema=Schema{STRUCT}, value=Struct{text=some-text}, valueSchema=Schema{STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
请注意,key 和 keySchema 字段是用 STRUCT 填充的(value 和 valueSchema 字段也是如此)。
我正在使用以下代码(我在 docker 中使用 Kafka)来使用消息...这是似乎 return 不正确数据的部分(关键是失踪)。
docker exec -it schema-registry \
/usr/bin/kafka-json-schema-console-consumer \
--bootstrap-server http://kafka:9092 \
--topic bgs-topic \
--from-beginning \
--property key.separator="|" \
--property value.schema='
{
"title": "simple-data-schema",
"type" : "object",
"required" : [ "text" ],
"properties" : {
"text" : {
"type": "string"
}
}
}' \
--property parse.key=true \
--property key.schema='
{
"title": "simple-key-schema",
"type" : "object",
"required" : [ "id" ],
"properties" : {
"id" : {
"type": "string"
}
}
}'
这是 returned 消息 - 请注意唯一可用的值:
{"text":"some-text"}
发现错误...我使用了“parse.key=true”,但它应该是“print.key=true”...一旦做出此更改,就会提供正确的输出:
{"id":"some-id"}|{"text":"some-text"}
修改后的命令如下:
docker 执行 -it schema-registry
/usr/bin/kafka-json-schema-console-consumer
--bootstrap-server http://kafka:9092
--topic bgs-topic
--from-beginning
--属性 key.separator="|"
--属性value.schema='
{
"title": "simple-data-schema",
“类型”:“对象”,
“必填”:[“文本”],
“特性” : {
“文本” : {
“类型”:“字符串”
}
}
}'
--属性 print.key=真
--属性key.schema='
{
"title": "simple-key-schema",
“类型”:“对象”,
“必填”:[“id”],
“特性” : {
“ID” : {
“类型”:“字符串”
}
}
}'