Kafka HDFS 接收器连接器错误。[顶级类型必须是 STRUCT..]

Kafka HDFS Sink Connector error.[Top level type must be STRUCT..]

我正在使用 Kafka connect 测试 2.7 版本的 Kafka, 我遇到了我不明白的问题。

我首先使用如下配置启动了分布式连接器。

bootstrap.servers=..:9092,...:9092, ...
group.id=kafka-connect-test
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

... some internal topic configuration

plugin.path=<plugin path>

此连接器支持 8083 端口。

并且我想在 HDFS 上使用 snappy 编解码器写入 ORC 格式数据。
所以我用 REST API 和 json 数据制作了新的 Kafka HDFS 连接器,如下所示。 我不使用架构注册表。

curl -X POST <connector url:8083> \
-H Accept: application/json \
-H Content-Type: application/json \
-d
{
    "name": "hdfs-sinkconnect-test",
    "config": {
        "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
        "store.url": "hdfs:~",
        "hadoop.conf.dir": "<my hadoop.conf dir>",
        "hadoop.home": "<hadoop home dir>",
        "tasks.max": "5",
        "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
        "value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
        "format.class": "io.confluent.connect.hdfs.orc.OrcFormat",
        "flush.size": 1000,
        "avro.codec": "snappy",
        "topics": "<topic name>",
        "topics.dir": "/tmp/connect-logs",
        "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
        "locale": "ko_KR",
        "timezone": "Asia/Seoul",
        "partition.duration.ms": "3600000",
        "path.format": "'hour'=YYYYMMddHH/"
    }
}

然后我有这样的错误信息。

# connectDistributed.out

[2021-06-28 17:14:11,596] ERROR Exception on topic partition <topic name>-<partition number>:  (io.confluent.connect.hdfs.TopicPartitionWriter:409)
org.apache.kafka.connect.errors.ConnectException: Top level type must be STRUCT but was bytes
        at io.confluent.connect.hdfs.orc.OrcRecordWriterProvider.write(OrcRecordWriterProvider.java:98)
        at io.confluent.connect.hdfs.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:742)
        at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:385)
        at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:333)
        at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:126)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

我认为此错误消息与模式信息有关。 Schema Registry 对 Kafka Connector 来说必不可少吗?
解决此错误消息的任何想法或解决方案?谢谢

写入 ORC 文件需要 Struct 类型。

选项 provided by Confluent 包括普通 JSON、JSONSchema、Avro 或 Protobuf。唯一不需要注册表的选项是普通的 JsonConverter

请注意,key.deserializervalue.deserializer 不是有效的连接属性。您需要参考您的 key.convertervalue.converter 属性

如果您不愿意修改转换器,您可以尝试使用 HoistField transformer 创建一个结构,这将创建一个只有一个字段的模式的 ORC 文件