Kafka connect JDBC 错误,尽管提供了架构和有效负载

Kafka connect JDBC error despite giving schema and payload

当我 运行 kafka JDBC 连接到 PSQL 时出现以下错误:

JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

然而,我的主题包含以下消息结构,并添加了一个架构,就像它在线呈现的那样:

rowtime: 2022/02/04 12:45:48.520 Z, key: , value: "{"schema": {"type": "struct", "fields": [{"type": "int", "field": "ID", "optional": false}, {"type": "date", "field": "Date", "optional": false}, {"type": "varchar", "field": "ICD", "optional": false}, {"type": "int", "field": "CPT", "optional": false}, {"type": "double", "field": "Cost", "optional": false}], "optional": false, "name": "test"}, "payload": {"ID": "24427934", "Date": "2019-05-22", "ICD": "883.436", "CPT": "60502", "cost": "1374.36"}}", partition: 0

我的连接器配置是:

 curl -X PUT http://localhost:8083/connectors/claim_test/config \
    -H "Content-Type: application/json" \
    -d '{
     "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
     "connection.url":"jdbc:postgresql://localhost:5432/ae2772",
     "key.converter":"org.apache.kafka.connect.json.JsonConverter",
     "value.converter":"org.apache.kafka.connect.json.JsonConverter",
     "value.converter.schemas.enable":"true",
     "topics":"test_7",
     "auto.create":"true",
     "insert.mode":"insert"
    }'

经过一些更改后,我现在收到以下消息:

WorkerSinkTask{id=claim_test} Error converting message value in topic 'test_9' partition 0 at offset 0 and timestamp 1644005137197: Unknown schema type: int

int 不是有效的架构类型。应为 int8int16int32int64

同理,datevarchardouble也是无效的。

JSON 中使用的类型与 Postgres 或任何 SQL-specific 类型不同(日期应转换为 Unix 纪元 int64 时间或 string).

您可以在此处找到支持的架构类型:https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/Schema.java