kafka 连接 JDBC 接收器。展平 JSON 条记录时出错

kafka connect JDBC sink. Error flattening JSON records

我正在使用 Confluent 的 Kafka 连接 JDBC Sink Connector to stored data from topics into a SQL Server table. The data needs to be flattened. I've created a SQL Server table and a JSON record based on the example provided

所以我的记录是这个:

{
    "payload":{ 
        "id": 42,
        "name": {
          "first": "David"
        }
    },
    "schema": {
        "fields": [
            {
                "field": "id",
                "optional": true,
                "type": "int32"
            },
            {
                "name": "name",
                "optional": "false",
                "type": "struct",
                "fields": [
                    {
                        "field": "first",
                        "optional": true,
                        "type": "string"
                    }
                ]
            }
        ],
        "name": "Test",
        "optional": false,
        "type": "struct"
    }   
}

如您所见,我想展平连接定界符“_”的字段。所以我的Sink Connector配置如下:

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
table.name.format=MyTable
transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value
topics=myTopic
tasks.max=1
transforms=flatten
value.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
connection.url=jdbc:sqlserver:[url]
transforms.flatten.delimiter=_

当我在主题中写入该记录时,出现以下异常:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Struct schema's field name not specified properly
    at org.apache.kafka.connect.json.JsonConverter.asConnectSchema(JsonConverter.java:512)
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:360)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 13 more

对于不需要展平的记录,接收器连接器工作正常。配置有问题吗?是否可以使用架构展平 JSON 文件?

P.S。 Kafka 连接版本:5.3.0-css

如有任何帮助,我们将不胜感激。

好的,问题是嵌套字段的字段名称。正确的字段名称是 "field",而不是 "name":

{
    "payload":{ 
        "id": 42,
        "name": {
          "first": "David"
        }
    },
    "schema": {
        "fields": [
            {
                "field": "id",
                "optional": true,
                "type": "int32"
            },
            {
                **"field": "name",**
                "optional": "false",
                "type": "struct",
                "fields": [
                    {
                        "field": "first",
                        "optional": true,
                        "type": "string"
                    }
                ]
            }
        ],
        "name": "Test",
        "optional": false,
        "type": "struct"
    }   
}