Kafka HDFS 接收器连接器错误。[顶级类型必须是 STRUCT..]
Kafka HDFS Sink Connector error.[Top level type must be STRUCT..]
我正在使用 Kafka connect 测试 2.7 版本的 Kafka,
我遇到了我不明白的问题。
我首先使用如下配置启动了分布式连接器。
bootstrap.servers=..:9092,...:9092, ...
group.id=kafka-connect-test
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
... some internal topic configuration
plugin.path=<plugin path>
此连接器支持 8083 端口。
并且我想在 HDFS 上使用 snappy 编解码器写入 ORC 格式数据。
所以我用 REST API 和 json 数据制作了新的 Kafka HDFS 连接器,如下所示。
我不使用架构注册表。
curl -X POST <connector url:8083> \
-H Accept: application/json \
-H Content-Type: application/json \
-d
{
"name": "hdfs-sinkconnect-test",
"config": {
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"store.url": "hdfs:~",
"hadoop.conf.dir": "<my hadoop.conf dir>",
"hadoop.home": "<hadoop home dir>",
"tasks.max": "5",
"key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
"format.class": "io.confluent.connect.hdfs.orc.OrcFormat",
"flush.size": 1000,
"avro.codec": "snappy",
"topics": "<topic name>",
"topics.dir": "/tmp/connect-logs",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"locale": "ko_KR",
"timezone": "Asia/Seoul",
"partition.duration.ms": "3600000",
"path.format": "'hour'=YYYYMMddHH/"
}
}
然后我有这样的错误信息。
# connectDistributed.out
[2021-06-28 17:14:11,596] ERROR Exception on topic partition <topic name>-<partition number>: (io.confluent.connect.hdfs.TopicPartitionWriter:409)
org.apache.kafka.connect.errors.ConnectException: Top level type must be STRUCT but was bytes
at io.confluent.connect.hdfs.orc.OrcRecordWriterProvider.write(OrcRecordWriterProvider.java:98)
at io.confluent.connect.hdfs.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:742)
at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:385)
at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:333)
at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:126)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我认为此错误消息与模式信息有关。
Schema Registry 对 Kafka Connector 来说必不可少吗?
解决此错误消息的任何想法或解决方案?谢谢
写入 ORC 文件需要 Struct 类型。
选项 provided by Confluent 包括普通 JSON、JSONSchema、Avro 或 Protobuf。唯一不需要注册表的选项是普通的 JsonConverter
请注意,key.deserializer
和 value.deserializer
不是有效的连接属性。您需要参考您的 key.converter
和 value.converter
属性
如果您不愿意修改转换器,您可以尝试使用 HoistField
transformer 创建一个结构,这将创建一个只有一个字段的模式的 ORC 文件
我正在使用 Kafka connect 测试 2.7 版本的 Kafka, 我遇到了我不明白的问题。
我首先使用如下配置启动了分布式连接器。
bootstrap.servers=..:9092,...:9092, ...
group.id=kafka-connect-test
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
... some internal topic configuration
plugin.path=<plugin path>
此连接器支持 8083 端口。
并且我想在 HDFS 上使用 snappy 编解码器写入 ORC 格式数据。
所以我用 REST API 和 json 数据制作了新的 Kafka HDFS 连接器,如下所示。
我不使用架构注册表。
curl -X POST <connector url:8083> \
-H Accept: application/json \
-H Content-Type: application/json \
-d
{
"name": "hdfs-sinkconnect-test",
"config": {
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"store.url": "hdfs:~",
"hadoop.conf.dir": "<my hadoop.conf dir>",
"hadoop.home": "<hadoop home dir>",
"tasks.max": "5",
"key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
"format.class": "io.confluent.connect.hdfs.orc.OrcFormat",
"flush.size": 1000,
"avro.codec": "snappy",
"topics": "<topic name>",
"topics.dir": "/tmp/connect-logs",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"locale": "ko_KR",
"timezone": "Asia/Seoul",
"partition.duration.ms": "3600000",
"path.format": "'hour'=YYYYMMddHH/"
}
}
然后我有这样的错误信息。
# connectDistributed.out
[2021-06-28 17:14:11,596] ERROR Exception on topic partition <topic name>-<partition number>: (io.confluent.connect.hdfs.TopicPartitionWriter:409)
org.apache.kafka.connect.errors.ConnectException: Top level type must be STRUCT but was bytes
at io.confluent.connect.hdfs.orc.OrcRecordWriterProvider.write(OrcRecordWriterProvider.java:98)
at io.confluent.connect.hdfs.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:742)
at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:385)
at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:333)
at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:126)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我认为此错误消息与模式信息有关。
Schema Registry 对 Kafka Connector 来说必不可少吗?
解决此错误消息的任何想法或解决方案?谢谢
写入 ORC 文件需要 Struct 类型。
选项 provided by Confluent 包括普通 JSON、JSONSchema、Avro 或 Protobuf。唯一不需要注册表的选项是普通的 JsonConverter
请注意,key.deserializer
和 value.deserializer
不是有效的连接属性。您需要参考您的 key.converter
和 value.converter
属性
如果您不愿意修改转换器,您可以尝试使用 HoistField
transformer 创建一个结构,这将创建一个只有一个字段的模式的 ORC 文件