Kafka s3 json 连接器

Kafka s3 json connector

我尝试使用最新的 kafka (confluent-platform-2.11) 连接将 Json 放入 s3。我设置 format.class=io.confluent.connect.s3.format.json.JsonFormatquickstart-s3.properties 文件

并加载连接器:

~$ confluent load s3-sink  {   "name": "s3-sink",   "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "1",
    "topics": "s3_hose",
    "s3.region": "us-east-1",
    "s3.bucket.name": "some-bucket-name",
    "s3.part.size": "5242880",
    "flush.size": "1",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
    "schema.compatibility": "NONE",
    "name": "s3-sink"   },   "tasks": [
    {
      "connector": "s3-sink",
      "task": 0
    }   ],   "type": null }

然后我向Kafka发送了一行:

~$ kafka-console-producer --broker-list localhost:9092 --topic s3_hose

{"q":1}

我在连接器日志中看到 Avro 转换异常:

[2018-01-14 14:41:30,832] ERROR WorkerSinkTask{id=s3-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runti me.WorkerTask:172) org.apache.kafka.connect.errors.DataException: s3_hose
        at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:96)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:454)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

如果我设置 format.class=io.confluent.connect.s3.format.json.JsonFormat 为什么会尝试使用某些 Avro 转换器?

此消息是指转换器。这与最终输出格式不同。它用于将 Kafka 中的数据转换为连接数据 API 格式,以便连接器具有可使用的标准。要设置转换器,您可以

1) 将 key.converter 和 value.converter 设置为 worker 属性文件中的内置 JsonConverter,使其成为 worker[=] 中所有连接器 运行 的默认值10=]

2) 在连接器级别设置 key.converter 和 value.converter 属性以覆盖在工作级别设置的内容

请注意,由于这是一个接收器连接器,您将非常希望将您的转换器与主题中的数据类型相匹配,以便它可以正确转换。