Kafka s3 连接 "Value is not Struct type" 错误
Kafka s3 connect "Value is not Struct type" error
我使用以下参数加载 s3 连接器:
confluent load s3-sink
{
"name": "s3-sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "1",
"topics": "s3_topic",
"s3.region": "us-east-1",
"s3.bucket.name": "some_bucket",
"s3.part.size": "5242880",
"flush.size": "1",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class": "io.confluent.connect.storage.partitioner.FieldPartitioner",
"schema.compatibility": "NONE",
"partition.field.name": "f1",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"name": "s3-sink"
},
"tasks": [
{
"connector": "s3-sink",
"task": 0
}
],
"type": null
}
接下来我用kafka-console-producer发送JSON:
{"f1":"partition","data":"some data"}
我在连接日志中收到以下错误:
[2018-05-16 16:32:05,150] ERROR Value is not Struct type. (io.confluent.connect.storage.partitioner.FieldPartitioner:67)
[2018-05-16 16:32:05,150] ERROR WorkerSinkTask{id=s3-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not re
cover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:515)
io.confluent.connect.storage.errors.PartitionException: Error encoding partition.
我记得以前用过。
现在我使用 Confluent Open Source v. 4.1
从 Confluent 4.1 版本开始 FieldPartitioner does not support JSON 字段提取。
You could instead use kafka-avro-console-producer
使用 Avro Schema 发送相同的 JSON blob,那么它应该可以工作
这是您要使用的属性
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"},{"name":"data","type":"string"}]}'
然后就可以发送了
{"f1":"partition","data":"some data"}
并且您需要在 Connect
中使用这些属性
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable": "true",
我使用以下参数加载 s3 连接器:
confluent load s3-sink
{
"name": "s3-sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "1",
"topics": "s3_topic",
"s3.region": "us-east-1",
"s3.bucket.name": "some_bucket",
"s3.part.size": "5242880",
"flush.size": "1",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class": "io.confluent.connect.storage.partitioner.FieldPartitioner",
"schema.compatibility": "NONE",
"partition.field.name": "f1",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"name": "s3-sink"
},
"tasks": [
{
"connector": "s3-sink",
"task": 0
}
],
"type": null
}
接下来我用kafka-console-producer发送JSON:
{"f1":"partition","data":"some data"}
我在连接日志中收到以下错误:
[2018-05-16 16:32:05,150] ERROR Value is not Struct type. (io.confluent.connect.storage.partitioner.FieldPartitioner:67)
[2018-05-16 16:32:05,150] ERROR WorkerSinkTask{id=s3-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not re
cover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:515)
io.confluent.connect.storage.errors.PartitionException: Error encoding partition.
我记得以前用过。
现在我使用 Confluent Open Source v. 4.1
从 Confluent 4.1 版本开始 FieldPartitioner does not support JSON 字段提取。
You could instead use kafka-avro-console-producer
使用 Avro Schema 发送相同的 JSON blob,那么它应该可以工作
这是您要使用的属性
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"},{"name":"data","type":"string"}]}'
然后就可以发送了
{"f1":"partition","data":"some data"}
并且您需要在 Connect
中使用这些属性"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable": "true",