使用 kafka 控制台消费者消费时显示无效字符
Shows invalid characters while consuming using kafka console consumer
使用 Kafka 控制台使用者或 kt(Kafka 的 GoLang CLI 工具)从 Kafka 主题消费时,我收到无效字符。
...
\u0000\ufffd?\u0006app\u0000\u0000\u0000\u0000\u0000\u0000\u003e@\u0001
\u0000\u000cSec-39\u001aSome Actual Value Text\ufffd\ufffd\ufffd\ufffd\ufffd
\ufffd\u0015@\ufffd\ufffd\ufffd\ufffd\ufffd\ufff
...
尽管 Kafka 连接实际上可以将正确的数据发送到 SQL 数据库。
默认情况下,控制台消费者工具使用 ByteArrayDeserializer
反序列化消息键和值,但随后显然会尝试使用默认格式化程序将数据打印到命令行。
不过,此工具允许自定义使用的反序列化器和格式化程序。请参阅帮助输出中的以下摘录:
--formatter <String: class> The name of a class to use for
formatting kafka messages for
display. (default: kafka.tools.
DefaultMessageFormatter)
--property <String: prop> The properties to initialize the
message formatter. Default
properties include:
print.timestamp=true|false
print.key=true|false
print.value=true|false
key.separator=<key.separator>
line.separator=<line.separator>
key.deserializer=<key.deserializer>
value.deserializer=<value.
deserializer>
Users can also pass in customized
properties for their formatter; more
specifically, users can pass in
properties keyed with 'key.
deserializer.' and 'value.
deserializer.' prefixes to configure
their deserializers.
--key-deserializer <String:
deserializer for key>
--value-deserializer <String:
deserializer for values>
使用这些设置,您应该能够将输出更改为您想要的。
鉴于你说
Kafka connect can actually sink the proper data to an SQL database.
我的假设是您正在对主题数据使用 Avro 序列化。正确配置的 Kafka Connect 将获取 Avro 数据并将其反序列化。
但是,kafka-console-consumer
、kt
、kafkacat
等控制台工具不支持Avro,所以你得到一个如果您使用它们从 Avro 编码的主题中读取数据,则会出现一堆奇怪的字符。
要将 Avro 数据读取到命令行,您可以使用 kafka-avro-console-consumer
:
kafka-avro-console-consumer
--bootstrap-server kafka:29092\
--topic test_topic_avro \
--property schema.registry.url=http://schema-registry:8081
编辑:也添加来自@CodeGeas 的建议:
或者,使用 REST Proxy 读取数据可以通过以下方式完成:
# Create a consumer for JSON data
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
-H "Accept: application/vnd.kafka.v2+json" \
--data '{"name": "my_consumer_instance", "format": "avro", "auto.offset.reset": "earliest"}' \
# Subscribe the consumer to a topic
http://kafka-rest-instance:8082/consumers/my_json_consumer
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"topics":["YOUR-TOPIC-NAME"]}' \
http://kafka-rest-instance:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription
# Then consume some data from a topic using the base URL in the first response.
curl -X GET -H "Accept: application/vnd.kafka.avro.v2+json" \
http://kafka-rest-instance:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
稍后,删除消费者:
curl -X DELETE -H "Accept: application/vnd.kafka.avro.v2+json" \
http://kafka-rest-instance:8082/consumers/my_json_consumer/instances/my_consumer_instance
使用 Kafka 控制台使用者或 kt(Kafka 的 GoLang CLI 工具)从 Kafka 主题消费时,我收到无效字符。
...
\u0000\ufffd?\u0006app\u0000\u0000\u0000\u0000\u0000\u0000\u003e@\u0001
\u0000\u000cSec-39\u001aSome Actual Value Text\ufffd\ufffd\ufffd\ufffd\ufffd
\ufffd\u0015@\ufffd\ufffd\ufffd\ufffd\ufffd\ufff
...
尽管 Kafka 连接实际上可以将正确的数据发送到 SQL 数据库。
默认情况下,控制台消费者工具使用 ByteArrayDeserializer
反序列化消息键和值,但随后显然会尝试使用默认格式化程序将数据打印到命令行。
不过,此工具允许自定义使用的反序列化器和格式化程序。请参阅帮助输出中的以下摘录:
--formatter <String: class> The name of a class to use for
formatting kafka messages for
display. (default: kafka.tools.
DefaultMessageFormatter)
--property <String: prop> The properties to initialize the
message formatter. Default
properties include:
print.timestamp=true|false
print.key=true|false
print.value=true|false
key.separator=<key.separator>
line.separator=<line.separator>
key.deserializer=<key.deserializer>
value.deserializer=<value.
deserializer>
Users can also pass in customized
properties for their formatter; more
specifically, users can pass in
properties keyed with 'key.
deserializer.' and 'value.
deserializer.' prefixes to configure
their deserializers.
--key-deserializer <String:
deserializer for key>
--value-deserializer <String:
deserializer for values>
使用这些设置,您应该能够将输出更改为您想要的。
鉴于你说
Kafka connect can actually sink the proper data to an SQL database.
我的假设是您正在对主题数据使用 Avro 序列化。正确配置的 Kafka Connect 将获取 Avro 数据并将其反序列化。
但是,kafka-console-consumer
、kt
、kafkacat
等控制台工具不支持Avro,所以你得到一个如果您使用它们从 Avro 编码的主题中读取数据,则会出现一堆奇怪的字符。
要将 Avro 数据读取到命令行,您可以使用 kafka-avro-console-consumer
:
kafka-avro-console-consumer
--bootstrap-server kafka:29092\
--topic test_topic_avro \
--property schema.registry.url=http://schema-registry:8081
编辑:也添加来自@CodeGeas 的建议:
或者,使用 REST Proxy 读取数据可以通过以下方式完成:
# Create a consumer for JSON data
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
-H "Accept: application/vnd.kafka.v2+json" \
--data '{"name": "my_consumer_instance", "format": "avro", "auto.offset.reset": "earliest"}' \
# Subscribe the consumer to a topic
http://kafka-rest-instance:8082/consumers/my_json_consumer
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"topics":["YOUR-TOPIC-NAME"]}' \
http://kafka-rest-instance:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription
# Then consume some data from a topic using the base URL in the first response.
curl -X GET -H "Accept: application/vnd.kafka.avro.v2+json" \
http://kafka-rest-instance:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
稍后,删除消费者:
curl -X DELETE -H "Accept: application/vnd.kafka.avro.v2+json" \
http://kafka-rest-instance:8082/consumers/my_json_consumer/instances/my_consumer_instance