无法通过 kafka-avro-console-consumer 读取 avro 消息(最终目标通过 spark streaming 读取)

unable to read avro message via kafka-avro-console-consumer (end goal read it via spark streaming)

(最终目标) 在尝试我是否最终可以从 Confluent 平台读取 avro 数据,usng spark 流之前,就像这里描述的那样:

我想验证一下我是否可以使用下面的命令来读取它们:

$ kafka-avro-console-consumer \
> --topic my-topic-produced-using-file-pulse-xml \
> --from-beginning \
> --bootstrap-server localhost:9092 \
> --property schema.registry.url=http://localhost:8081

我收到这条错误消息未知的魔法字节

Processed a total of 1 messages
[2020-09-10 12:59:54,795] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:76)
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2020-09-10 12:59:54,795] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:76)
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

注意,可以这样阅读消息(使用控制台消费者而不是 avro-console-consumer):

kafka-console-consumer \
--bootstrap-server localhost:9092 --group my-group-console \
--from-beginning \
--topic my-topic-produced-using-file-pulse-xml

消息是使用 confluent connect file-pulse (1.5.2) 读取 xml 文件 (streamthoughts/kafka-connect-file-pulse)

生成的

请帮忙: 我用错了 kafka-avro-console-consumer 吗? 我尝试了此处描述的“解串器”属性选项:,没有帮助

还不想鼓起勇气启动spark streaming读取数据

我使用的 file-pulse 1.5.2 属性如下所示 已添加 11/09/2020 完成。

name=connect-file-pulse-xml
connector.class=io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector
topic= my-topic-produced-using-file-pulse-xml
tasks.max=1

# File types
fs.scan.filters=io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter
file.filter.regex.pattern=.*\.xml$
task.reader.class=io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader
force.array.on.fields=sometagNameInXml

# File scanning
fs.cleanup.policy.class=io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy
fs.scanner.class=io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker

fs.scan.directory.path=/tmp/kafka-connect/xml/
fs.scan.interval.ms=10000

# Internal Reporting
internal.kafka.reporter.bootstrap.servers=localhost:9092
internal.kafka.reporter.id=connect-file-pulse-xml
internal.kafka.reporter.topic=connect-file-pulse-status

# Track file by name
offset.strategy=name

如果您从消费者那里得到未知的 Magic Byte,那么生产者没有使用 Confluent AvroSerializer,并且可能推送了不使用 Schema Registry 的 Avro 数据。

如果不查看 Producer 代码或使用和检查二进制格式的数据,很难知道是哪种情况。

The message was produced using confluent connect file-pulse

您是否将 value.converter 与 AvroConverter class 一起使用?