即使 json 数据包含架构和有效负载字段,kafka connect hdfs 接收器连接器也会失败

kafka connect hdfs sink connector is failing even when json data contains schema and payload field

我正在尝试使用 kafka connect hdfs 接收器连接器将 json 数据从 kafka 移动到 hdfs。

即使 kafka 中的 json 数据具有模式和有效载荷,kafka 连接任务也会因错误而失败

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields.

卡夫卡中的数据:

./bin/kafka-console-consumer --topic test_hdfs_json_schema_payload_1 --zookeeper localhost:2181 --from-beginning

{"schema": {"type": "struct","fields": [{"type": "string","optional": false,"field": "Name"}, {"type": "string","optional": false,"field": "company"}],"optional": false,"name": "Person"},"payload": {"Name": "deepak","company": "BT"}}
{"schema": {"type": "struct","fields": [{"type": "string","optional": false,"field": "Name"}, {"type": "string","optional": false,"field": "company"}],"optional": false,"name": "Person"},"payload": {"Name": "sufi","company": "BT"}}
{"schema": {"type": "struct","fields": [{"type": "string","optional": false,"field": "Name"}, {"type": "string","optional": false,"field": "company"}],"optional": false,"name": "Person"},"payload": {"Name": "vikas","company": "BT"}}

使用以下命令提交了 HDFS 接收器作业:

curl -X POST -H "Content-Type: application/json" --data '{"name": "connect-cluster-15may-308pm", "config": {"connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max":"2", "hdfs.url": "hdfs://localhost:9000","flush.size": "1","topics":"test_hdfs_json_schema_payload_1","topics.dir":"/deepak/kafka-connect/trial1"}}' http://localhost:8083/connectors

分布式kafka connect worker配置:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

错误信息:

http://localhost:8083/connectors/connect-cluster-15may-308pm/tasks/0/status

{
    "state": "FAILED",
    "trace": "org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.\n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:309)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:400)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n",
    "id": 0,
    "worker_id": "127.0.0.1:8083"
}

您使用的是什么版本的 Kafka Connect?在从堆栈跟踪确定错误来源时,了解这一点很有帮助。

我认为发生的情况是您在值中有数据,但在键中没有。由于您将 key.convertervalue.converter 都设置为 JsonConverter 并使用 schemas.enable=true,因此期望看到包含 schema 和 [=15= 的信封格式] 对彼此而言。不过,我猜你的钥匙都是 null.

这有点像 https://issues.apache.org/jira/browse/KAFKA-3832 的逆向问题,其中 JsonConverter 永远不会生成真正的 null 值。相反,它总是生成包含预期的可选模式 + null 有效负载的信封。在这种情况下,从 Kafka 转换到 Connect 的数据 API 不起作用,因为它期望密钥中的信封格式相同。

您可以通过将 --property print.key=true 添加到您的控制台消费者命令来验证这是问题所在。如果它打印出 null 键,问题是 JsonConverter 无法解码它们。

一个简单的解决方法是只对不关心 null 值的键使用其他一些 Converter -- 反正键中没有数据。 Kafka Connect 附带的一个是 org.apache.kafka.connect.storage.StringConverter.