我可以编写自定义 kafka 连接转换以将 JSON 转换为 AVRO 吗?

Can I write custom kafka connect transform for converting JSON to AVRO?

我想使用 kafka-connect-hdfs 将无模式 json 记录从 kafka 写入 hdfs 文件。 如果我将 JsonConvertor 用作 key/value 转换器,则它无法正常工作。但是,如果我使用的是 StringConvertor,那么它会将 json 写入转义字符串。

例如:

实际 json -

{"name":"test"}

写入 hdfs 文件的数据 -

"{\"name\":\"test\"}"

预期输出到 hdfs 文件 -

{"name":"test"}

是否有任何方法或替代方案可以实现此目的,或者我必须仅将其与架构一起使用?

下面是我尝试使用 JSONConvertor 时遇到的异常:

[2017-09-06 14:40:19,344] ERROR Task hdfs-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:308)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:406)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

quickstart-hdfs.properties的配置:

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=test_hdfs_avro
hdfs.url=hdfs://localhost:9000
flush.size=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

connect-avro-standalone.properties的配置:

bootstrap.servers=localhost:9092
schemas.enable=false
key.converter.schemas.enable=false
value.converter.schemas.enable=false

当您在连接器的配置属性中指定一个转换器时,您需要包括与该转换器相关的所有属性,无论这些属性是否也包含在工作者的配置中。

在上面的示例中,您需要同时指定:

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

quickstart-hdfs.properties.

仅供参考,JSON 导出即将在 HDFS 连接器中推出。在此处跟踪相关的拉取请求:https://github.com/confluentinc/kafka-connect-hdfs/pull/196

更新:JsonFormat已合并到master分支