Debezium Postgres 接收器连接器无法插入 DATE 类型的值

Debezium Postgres sink connector fails to insert values with type DATE

设置源连接器和接收器连接器后,DATE 类型的 Postgres 列出现问题。

ERROR: column "foo" is of type date but expression is of type integer

我检查了 Avro 架构,发现列 foo 被序列化为 io.debezium.time.Date

{
    "default": null,
    "name": "foo",
    "type": [
        "null",
        {
            "connect.name": "io.debezium.time.Date",
            "connect.version": 1,
            "type": "int"
        }
    ]
}

我应该怎么做才能让接收器连接器正确插入此值(如 DATE,而不是 INTEGER)?

完整堆栈跟踪:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "test_table" ("id","foo") VALUES (75046,18577) ON CONFLICT ("id") DO UPDATE SET "foo"=EXCLUDED."foo" was aborted: ERROR: column "foo" is of type date but expression is of type integer
  Hint: You will need to rewrite or cast the expression.
  Position: 249  Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
  Hint: You will need to rewrite or cast the expression.
  Position: 249
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
  Hint: You will need to rewrite or cast the expression.
  Position: 249

    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:89)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
    ... 10 more
Caused by: java.sql.SQLException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "test_table" ("id","foo") VALUES (75046,18577) ON CONFLICT ("id") DO UPDATE SET "foo"=EXCLUDED."foo" was aborted: ERROR: column "foo" is of type date but expression is of type integer
  Hint: You will need to rewrite or cast the expression.
  Position: 249  Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
  Hint: You will need to rewrite or cast the expression.
  Position: 249
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
  Hint: You will need to rewrite or cast the expression.
  Position: 249

    ... 12 more

源配置:

{
    "name": "dbz-source-test-1",
    "config": {
        "name":"dbz-source-test-1",
        "connector.class":"io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname":"some.host",
        "database.port":"5432",
        "database.user":"test_debezium",
        "database.password":"password",
        "database.dbname":"dbname",
        "plugin.name":"wal2json_rds",
        "slot.name":"wal2json_rds",
        "database.server.name":"server_test",
        "table.whitelist":"public.test_table",
        "transforms":"route",
        "transforms.route.type":"org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.route.regex":"([^.]+)\.([^.]+)\.([^.]+)",
        "transforms.route.replacement":"dbz_source_",
        "topic.selection.strategy":"topic_per_table",
        "include.unknown.datatypes":true,
        "decimal.handling.mode":"double",
        "snapshot.mode":"never"
    }
}

接收器配置:

{
    "name": "dbz-sink-test-1",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "config.providers" : "file",
        "config.providers.file.class" : "org.apache.kafka.common.config.provider.FileConfigProvider",
        "config.providers.file.param.secrets" : "/opt/mysecrets",
        "topics": "dbz_source_test_table",
        "connection.url": "someurl",
        "connection.user": "${file:/opt/mysecrets.properties:user}",
        "connection.password" : "${file:/opt/mysecrets.properties:pass}",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
        "table.name.format": "dbz_source_",
        "insert.mode": "upsert",
        "pk.field": "id",
        "pk.mode": "record_value"
    }
}

我解决了切换源连接器的问题time.precision.mode config to connect

When the time.precision.mode configuration property is set to connect, then the connector will use the predefined Kafka Connect logical types. This may be useful when consumers only know about the built-in Kafka Connect logical types and are unable to handle variable-precision time values.

序列化类型不同后:

{
    "default": null,
    "name": "foo",
    "type": [
        "null",
        {
            "connect.name": "org.apache.kafka.connect.data.Date",
            "connect.version": 1,
            "logicalType": "date",
            "type": "int"
        }
    ]
}

接收器连接器识别 org.apache.kafka.connect.data.Date 类型并正确插入。