Kafka connect JDBC 错误,尽管提供了架构和有效负载
Kafka connect JDBC error despite giving schema and payload
当我 运行 kafka JDBC 连接到 PSQL 时出现以下错误:
JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
然而,我的主题包含以下消息结构,并添加了一个架构,就像它在线呈现的那样:
rowtime: 2022/02/04 12:45:48.520 Z, key: , value: "{"schema": {"type": "struct", "fields": [{"type": "int", "field": "ID", "optional": false}, {"type": "date", "field": "Date", "optional": false}, {"type": "varchar", "field": "ICD", "optional": false}, {"type": "int", "field": "CPT", "optional": false}, {"type": "double", "field": "Cost", "optional": false}], "optional": false, "name": "test"}, "payload": {"ID": "24427934", "Date": "2019-05-22", "ICD": "883.436", "CPT": "60502", "cost": "1374.36"}}", partition: 0
我的连接器配置是:
curl -X PUT http://localhost:8083/connectors/claim_test/config \
-H "Content-Type: application/json" \
-d '{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:postgresql://localhost:5432/ae2772",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"true",
"topics":"test_7",
"auto.create":"true",
"insert.mode":"insert"
}'
经过一些更改后,我现在收到以下消息:
WorkerSinkTask{id=claim_test} Error converting message value in topic 'test_9' partition 0 at offset 0 and timestamp 1644005137197: Unknown schema type: int
int
不是有效的架构类型。应为 int8
、int16
、int32
或 int64
。
同理,date
、varchar
、double
也是无效的。
JSON 中使用的类型与 Postgres 或任何 SQL-specific 类型不同(日期应转换为 Unix 纪元 int64
时间或 string
).
您可以在此处找到支持的架构类型:https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/Schema.java