JSONParseException 使用 Kafka rest 读取数据 API
JSONParseException reading data using Kafka rest API
卡夫卡主题(test3)
$ kafka-console-consumer --bootstrap-server broker:9092 --topic test3 --from-beginning
"Can we write to a topic that does not exist?"
"Can we write to a topic that does not exist?"
{"foo":"bar"}
["foo","bar"]
confluent
confluent
confluent
kafka
logs
0
0
消费者(kafka-rest API 在 localhost:8082
上)
- 创建消费者
POST
请求 http://localhost:8082/consumers/rested
请求Body:
{
"format": "json",
"auto.offset.reset": "earliest",
"auto.commit.enable": "false"
}
回应Body:
{
"instance_id": "rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff",
"base_uri": "http://rest-proxy:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff"
}
- 使用
POST
到 http://localhost:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff/subscription
创建订阅
使用 Headers:
Host: http://localhost:8082
Content-Type: application/vnd.kafka.v2+json
并请求 Body:
{
"topics": [
"test3"
]
}
returns 204 No Content
.
的响应
- 通过向
http://localhost:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff/records
发出 GET
请求来读取记录
使用 Headers:
Host: http://localhost:8082
Accept: application/vnd.kafka.json.v2+json
returns 响应:
{
"error_code": 50002,
"message": "Kafka error: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')\n at [Source: (byte[])\"key\"; line: 1, column: 7]"
}
我们如何解决这个问题并确保我们收到数据?
异常(在 Kafka 上)
运行 Kafka Rest Proxy 服务器日志有以下异常:
rest-proxy | [2018-12-31 03:09:27,232] INFO 172.25.0.1 - - [31/Dec/2018:03:09:26 +0000] "GET /consumers/rest-consumer/instances/rest-consumer-8e49873e-13ce-46a5-be1f-0237a0369efe/records HTTP/1.1" 500 211 341 (io.confluent.rest-utils.requests)
rest-proxy | [2018-12-31 03:09:27,235] ERROR Unexpected exception in consumer read task id=io.confluent.kafkarest.v2.KafkaConsumerReadTask@59611e28 (io.confluent.kafkarest.v2.KafkaConsumerReadTask)
rest-proxy | org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')
rest-proxy | at [Source: (byte[])"key"; line: 1, column: 7]
rest-proxy | Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')
rest-proxy | at [Source: (byte[])"key"; line: 1, column: 7]
rest-proxy | at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
rest-proxy | at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:679)
rest-proxy | at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3526)
rest-proxy | at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2621)
rest-proxy | at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:826)
rest-proxy | at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:723)
rest-proxy | at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4141)
rest-proxy | at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4000)
rest-proxy | at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3091)
rest-proxy | at io.confluent.kafkarest.v2.JsonKafkaConsumerState.deserialize(JsonKafkaConsumerState.java:79)
rest-proxy | at io.confluent.kafkarest.v2.JsonKafkaConsumerState.createConsumerRecord(JsonKafkaConsumerState.java:64)
rest-proxy | at io.confluent.kafkarest.v2.KafkaConsumerReadTask.maybeAddRecord(KafkaConsumerReadTask.java:158)
rest-proxy | at io.confluent.kafkarest.v2.KafkaConsumerReadTask.addRecords(KafkaConsumerReadTask.java:142)
rest-proxy | at io.confluent.kafkarest.v2.KafkaConsumerReadTask.doPartialRead(KafkaConsumerReadTask.java:99)
rest-proxy | at io.confluent.kafkarest.v2.KafkaConsumerManager$RunnableReadTask.run(KafkaConsumerManager.java:370)
rest-proxy | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
rest-proxy | at java.util.concurrent.FutureTask.run(FutureTask.java:266)
rest-proxy | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
rest-proxy | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
rest-proxy | at java.lang.Thread.run(Thread.java:748)
Consumer-Groups CLI
我可以在 CLI 上查看 consumer-group,但它没有活动成员:
$ kafka-consumer-groups --bootstrap-server broker:9092 --list
有结果:
console-consumer-60695
console-consumer-62259
console-consumer-19307
console-consumer-47906
console-consumer-40838
rested
但是,当我尝试检索 members
:
$ kafka-consumer-groups --bootstrap-server localhost:29092 --group rest-consumer --describe --members
Consumer group 'rested' has no active members.
TL;DR
您需要将密钥用双引号括起来,不是因为所有密钥都需要用引号括起来,而是使用 JSON 解析器,您需要使密钥有效 JSON 和字符串用双引号括起来是有效的 JSON.
如果您确实需要处理此消息,则需要以不同于 JSON 的格式阅读它。
长答案
您的记录中的键没有引号,这使得值无效 JSON 因此当 Jackson JSON 解析器尝试解析该键时它是无效的 JSON (从错误消息中不清楚,但当它没有看到引号或方括号或大括号时,它开始假定它是布尔值或空值)。
您可以在此处查看它在哪里获取密钥并尝试将其解码为 JSON
我能够使用此方法重现您的错误
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "latest"}' \
http://localhost:8082/consumers/my_json_consumer
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["testjsontopic"]}' \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription
./bin/kafka-console-producer \
--broker-list :9092 \
--topic testjsontopic \
--property parse.key=true \
--property key.separator="&"
>"key"&{"foo":"bar"}
*Ctrl-C
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
此时我可以读取记录,但是当我添加一个没有引号的键时,我得到了和你一样的错误
./bin/kafka-console-producer \
--broker-list :9092 \
--topic testjsontopic \
--property parse.key=true \
--property key.separator="&"
>key&{"foo":"bar"}
现在当我调用这段代码时
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
现在我收到这个错误
com.fasterxml.jackson.core.JsonParseException: Unrecognized token
'key': was expecting ('true', 'false' or 'null')
也可以使用它来阅读您的主题键
./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic testjsontopic --property print.key=true --from-beginning
卡夫卡主题(test3)
$ kafka-console-consumer --bootstrap-server broker:9092 --topic test3 --from-beginning
"Can we write to a topic that does not exist?"
"Can we write to a topic that does not exist?"
{"foo":"bar"}
["foo","bar"]
confluent
confluent
confluent
kafka
logs
0
0
消费者(kafka-rest API 在 localhost:8082
上)
- 创建消费者
POST
请求http://localhost:8082/consumers/rested
请求Body:
{
"format": "json",
"auto.offset.reset": "earliest",
"auto.commit.enable": "false"
}
回应Body:
{
"instance_id": "rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff",
"base_uri": "http://rest-proxy:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff"
}
- 使用
POST
到http://localhost:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff/subscription
创建订阅
使用 Headers:
Host: http://localhost:8082
Content-Type: application/vnd.kafka.v2+json
并请求 Body:
{
"topics": [
"test3"
]
}
returns 204 No Content
.
- 通过向
http://localhost:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff/records
发出
GET
请求来读取记录
使用 Headers:
Host: http://localhost:8082
Accept: application/vnd.kafka.json.v2+json
returns 响应:
{
"error_code": 50002,
"message": "Kafka error: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')\n at [Source: (byte[])\"key\"; line: 1, column: 7]"
}
我们如何解决这个问题并确保我们收到数据?
异常(在 Kafka 上)
运行 Kafka Rest Proxy 服务器日志有以下异常:
rest-proxy | [2018-12-31 03:09:27,232] INFO 172.25.0.1 - - [31/Dec/2018:03:09:26 +0000] "GET /consumers/rest-consumer/instances/rest-consumer-8e49873e-13ce-46a5-be1f-0237a0369efe/records HTTP/1.1" 500 211 341 (io.confluent.rest-utils.requests)
rest-proxy | [2018-12-31 03:09:27,235] ERROR Unexpected exception in consumer read task id=io.confluent.kafkarest.v2.KafkaConsumerReadTask@59611e28 (io.confluent.kafkarest.v2.KafkaConsumerReadTask)
rest-proxy | org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')
rest-proxy | at [Source: (byte[])"key"; line: 1, column: 7]
rest-proxy | Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')
rest-proxy | at [Source: (byte[])"key"; line: 1, column: 7]
rest-proxy | at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
rest-proxy | at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:679)
rest-proxy | at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3526)
rest-proxy | at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2621)
rest-proxy | at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:826)
rest-proxy | at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:723)
rest-proxy | at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4141)
rest-proxy | at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4000)
rest-proxy | at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3091)
rest-proxy | at io.confluent.kafkarest.v2.JsonKafkaConsumerState.deserialize(JsonKafkaConsumerState.java:79)
rest-proxy | at io.confluent.kafkarest.v2.JsonKafkaConsumerState.createConsumerRecord(JsonKafkaConsumerState.java:64)
rest-proxy | at io.confluent.kafkarest.v2.KafkaConsumerReadTask.maybeAddRecord(KafkaConsumerReadTask.java:158)
rest-proxy | at io.confluent.kafkarest.v2.KafkaConsumerReadTask.addRecords(KafkaConsumerReadTask.java:142)
rest-proxy | at io.confluent.kafkarest.v2.KafkaConsumerReadTask.doPartialRead(KafkaConsumerReadTask.java:99)
rest-proxy | at io.confluent.kafkarest.v2.KafkaConsumerManager$RunnableReadTask.run(KafkaConsumerManager.java:370)
rest-proxy | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
rest-proxy | at java.util.concurrent.FutureTask.run(FutureTask.java:266)
rest-proxy | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
rest-proxy | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
rest-proxy | at java.lang.Thread.run(Thread.java:748)
Consumer-Groups CLI
我可以在 CLI 上查看 consumer-group,但它没有活动成员:
$ kafka-consumer-groups --bootstrap-server broker:9092 --list
有结果:
console-consumer-60695
console-consumer-62259
console-consumer-19307
console-consumer-47906
console-consumer-40838
rested
但是,当我尝试检索 members
:
$ kafka-consumer-groups --bootstrap-server localhost:29092 --group rest-consumer --describe --members
Consumer group 'rested' has no active members.
TL;DR
您需要将密钥用双引号括起来,不是因为所有密钥都需要用引号括起来,而是使用 JSON 解析器,您需要使密钥有效 JSON 和字符串用双引号括起来是有效的 JSON.
如果您确实需要处理此消息,则需要以不同于 JSON 的格式阅读它。
长答案
您的记录中的键没有引号,这使得值无效 JSON 因此当 Jackson JSON 解析器尝试解析该键时它是无效的 JSON (从错误消息中不清楚,但当它没有看到引号或方括号或大括号时,它开始假定它是布尔值或空值)。
您可以在此处查看它在哪里获取密钥并尝试将其解码为 JSON
我能够使用此方法重现您的错误
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "latest"}' \
http://localhost:8082/consumers/my_json_consumer
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["testjsontopic"]}' \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription
./bin/kafka-console-producer \
--broker-list :9092 \
--topic testjsontopic \
--property parse.key=true \
--property key.separator="&"
>"key"&{"foo":"bar"}
*Ctrl-C
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
此时我可以读取记录,但是当我添加一个没有引号的键时,我得到了和你一样的错误
./bin/kafka-console-producer \
--broker-list :9092 \
--topic testjsontopic \
--property parse.key=true \
--property key.separator="&"
>key&{"foo":"bar"}
现在当我调用这段代码时
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
现在我收到这个错误
com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')
也可以使用它来阅读您的主题键
./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic testjsontopic --property print.key=true --from-beginning