kafka 连接 JDBC 接收器。展平 JSON 条记录时出错
kafka connect JDBC sink. Error flattening JSON records
我正在使用 Confluent 的 Kafka 连接 JDBC Sink Connector to stored data from topics into a SQL Server table. The data needs to be flattened. I've created a SQL Server table and a JSON record based on the example provided。
所以我的记录是这个:
{
"payload":{
"id": 42,
"name": {
"first": "David"
}
},
"schema": {
"fields": [
{
"field": "id",
"optional": true,
"type": "int32"
},
{
"name": "name",
"optional": "false",
"type": "struct",
"fields": [
{
"field": "first",
"optional": true,
"type": "string"
}
]
}
],
"name": "Test",
"optional": false,
"type": "struct"
}
}
如您所见,我想展平连接定界符“_”的字段。所以我的Sink Connector配置如下:
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
table.name.format=MyTable
transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value
topics=myTopic
tasks.max=1
transforms=flatten
value.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
connection.url=jdbc:sqlserver:[url]
transforms.flatten.delimiter=_
当我在主题中写入该记录时,出现以下异常:
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Struct schema's field name not specified properly
at org.apache.kafka.connect.json.JsonConverter.asConnectSchema(JsonConverter.java:512)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:360)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
对于不需要展平的记录,接收器连接器工作正常。配置有问题吗?是否可以使用架构展平 JSON 文件?
P.S。 Kafka 连接版本:5.3.0-css
如有任何帮助,我们将不胜感激。
好的,问题是嵌套字段的字段名称。正确的字段名称是 "field",而不是 "name":
{
"payload":{
"id": 42,
"name": {
"first": "David"
}
},
"schema": {
"fields": [
{
"field": "id",
"optional": true,
"type": "int32"
},
{
**"field": "name",**
"optional": "false",
"type": "struct",
"fields": [
{
"field": "first",
"optional": true,
"type": "string"
}
]
}
],
"name": "Test",
"optional": false,
"type": "struct"
}
}
我正在使用 Confluent 的 Kafka 连接 JDBC Sink Connector to stored data from topics into a SQL Server table. The data needs to be flattened. I've created a SQL Server table and a JSON record based on the example provided。
所以我的记录是这个:
{
"payload":{
"id": 42,
"name": {
"first": "David"
}
},
"schema": {
"fields": [
{
"field": "id",
"optional": true,
"type": "int32"
},
{
"name": "name",
"optional": "false",
"type": "struct",
"fields": [
{
"field": "first",
"optional": true,
"type": "string"
}
]
}
],
"name": "Test",
"optional": false,
"type": "struct"
}
}
如您所见,我想展平连接定界符“_”的字段。所以我的Sink Connector配置如下:
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
table.name.format=MyTable
transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value
topics=myTopic
tasks.max=1
transforms=flatten
value.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
connection.url=jdbc:sqlserver:[url]
transforms.flatten.delimiter=_
当我在主题中写入该记录时,出现以下异常:
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Struct schema's field name not specified properly
at org.apache.kafka.connect.json.JsonConverter.asConnectSchema(JsonConverter.java:512)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:360)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
对于不需要展平的记录,接收器连接器工作正常。配置有问题吗?是否可以使用架构展平 JSON 文件?
P.S。 Kafka 连接版本:5.3.0-css
如有任何帮助,我们将不胜感激。
好的,问题是嵌套字段的字段名称。正确的字段名称是 "field",而不是 "name":
{
"payload":{
"id": 42,
"name": {
"first": "David"
}
},
"schema": {
"fields": [
{
"field": "id",
"optional": true,
"type": "int32"
},
{
**"field": "name",**
"optional": "false",
"type": "struct",
"fields": [
{
"field": "first",
"optional": true,
"type": "string"
}
]
}
],
"name": "Test",
"optional": false,
"type": "struct"
}
}