Debezium Postgres 接收器连接器无法插入 DATE 类型的值
Debezium Postgres sink connector fails to insert values with type DATE
设置源连接器和接收器连接器后,DATE
类型的 Postgres 列出现问题。
ERROR: column "foo" is of type date but expression is of type integer
我检查了 Avro 架构,发现列 foo
被序列化为 io.debezium.time.Date
{
"default": null,
"name": "foo",
"type": [
"null",
{
"connect.name": "io.debezium.time.Date",
"connect.version": 1,
"type": "int"
}
]
}
我应该怎么做才能让接收器连接器正确插入此值(如 DATE
,而不是 INTEGER
)?
完整堆栈跟踪:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "test_table" ("id","foo") VALUES (75046,18577) ON CONFLICT ("id") DO UPDATE SET "foo"=EXCLUDED."foo" was aborted: ERROR: column "foo" is of type date but expression is of type integer
Hint: You will need to rewrite or cast the expression.
Position: 249 Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
Hint: You will need to rewrite or cast the expression.
Position: 249
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
Hint: You will need to rewrite or cast the expression.
Position: 249
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:89)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
... 10 more
Caused by: java.sql.SQLException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "test_table" ("id","foo") VALUES (75046,18577) ON CONFLICT ("id") DO UPDATE SET "foo"=EXCLUDED."foo" was aborted: ERROR: column "foo" is of type date but expression is of type integer
Hint: You will need to rewrite or cast the expression.
Position: 249 Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
Hint: You will need to rewrite or cast the expression.
Position: 249
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
Hint: You will need to rewrite or cast the expression.
Position: 249
... 12 more
源配置:
{
"name": "dbz-source-test-1",
"config": {
"name":"dbz-source-test-1",
"connector.class":"io.debezium.connector.postgresql.PostgresConnector",
"database.hostname":"some.host",
"database.port":"5432",
"database.user":"test_debezium",
"database.password":"password",
"database.dbname":"dbname",
"plugin.name":"wal2json_rds",
"slot.name":"wal2json_rds",
"database.server.name":"server_test",
"table.whitelist":"public.test_table",
"transforms":"route",
"transforms.route.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex":"([^.]+)\.([^.]+)\.([^.]+)",
"transforms.route.replacement":"dbz_source_",
"topic.selection.strategy":"topic_per_table",
"include.unknown.datatypes":true,
"decimal.handling.mode":"double",
"snapshot.mode":"never"
}
}
接收器配置:
{
"name": "dbz-sink-test-1",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"config.providers" : "file",
"config.providers.file.class" : "org.apache.kafka.common.config.provider.FileConfigProvider",
"config.providers.file.param.secrets" : "/opt/mysecrets",
"topics": "dbz_source_test_table",
"connection.url": "someurl",
"connection.user": "${file:/opt/mysecrets.properties:user}",
"connection.password" : "${file:/opt/mysecrets.properties:pass}",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"table.name.format": "dbz_source_",
"insert.mode": "upsert",
"pk.field": "id",
"pk.mode": "record_value"
}
}
我解决了切换源连接器的问题time.precision.mode
config to connect
When the time.precision.mode configuration property is set to connect, then the connector will use the predefined Kafka Connect logical types. This may be useful when consumers only know about the built-in Kafka Connect logical types and are unable to handle variable-precision time values.
序列化类型不同后:
{
"default": null,
"name": "foo",
"type": [
"null",
{
"connect.name": "org.apache.kafka.connect.data.Date",
"connect.version": 1,
"logicalType": "date",
"type": "int"
}
]
}
接收器连接器识别 org.apache.kafka.connect.data.Date
类型并正确插入。
设置源连接器和接收器连接器后,DATE
类型的 Postgres 列出现问题。
ERROR: column "foo" is of type date but expression is of type integer
我检查了 Avro 架构,发现列 foo
被序列化为 io.debezium.time.Date
{
"default": null,
"name": "foo",
"type": [
"null",
{
"connect.name": "io.debezium.time.Date",
"connect.version": 1,
"type": "int"
}
]
}
我应该怎么做才能让接收器连接器正确插入此值(如 DATE
,而不是 INTEGER
)?
完整堆栈跟踪:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "test_table" ("id","foo") VALUES (75046,18577) ON CONFLICT ("id") DO UPDATE SET "foo"=EXCLUDED."foo" was aborted: ERROR: column "foo" is of type date but expression is of type integer
Hint: You will need to rewrite or cast the expression.
Position: 249 Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
Hint: You will need to rewrite or cast the expression.
Position: 249
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
Hint: You will need to rewrite or cast the expression.
Position: 249
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:89)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
... 10 more
Caused by: java.sql.SQLException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "test_table" ("id","foo") VALUES (75046,18577) ON CONFLICT ("id") DO UPDATE SET "foo"=EXCLUDED."foo" was aborted: ERROR: column "foo" is of type date but expression is of type integer
Hint: You will need to rewrite or cast the expression.
Position: 249 Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
Hint: You will need to rewrite or cast the expression.
Position: 249
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
Hint: You will need to rewrite or cast the expression.
Position: 249
... 12 more
源配置:
{
"name": "dbz-source-test-1",
"config": {
"name":"dbz-source-test-1",
"connector.class":"io.debezium.connector.postgresql.PostgresConnector",
"database.hostname":"some.host",
"database.port":"5432",
"database.user":"test_debezium",
"database.password":"password",
"database.dbname":"dbname",
"plugin.name":"wal2json_rds",
"slot.name":"wal2json_rds",
"database.server.name":"server_test",
"table.whitelist":"public.test_table",
"transforms":"route",
"transforms.route.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex":"([^.]+)\.([^.]+)\.([^.]+)",
"transforms.route.replacement":"dbz_source_",
"topic.selection.strategy":"topic_per_table",
"include.unknown.datatypes":true,
"decimal.handling.mode":"double",
"snapshot.mode":"never"
}
}
接收器配置:
{
"name": "dbz-sink-test-1",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"config.providers" : "file",
"config.providers.file.class" : "org.apache.kafka.common.config.provider.FileConfigProvider",
"config.providers.file.param.secrets" : "/opt/mysecrets",
"topics": "dbz_source_test_table",
"connection.url": "someurl",
"connection.user": "${file:/opt/mysecrets.properties:user}",
"connection.password" : "${file:/opt/mysecrets.properties:pass}",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"table.name.format": "dbz_source_",
"insert.mode": "upsert",
"pk.field": "id",
"pk.mode": "record_value"
}
}
我解决了切换源连接器的问题time.precision.mode
config to connect
When the time.precision.mode configuration property is set to connect, then the connector will use the predefined Kafka Connect logical types. This may be useful when consumers only know about the built-in Kafka Connect logical types and are unable to handle variable-precision time values.
序列化类型不同后:
{
"default": null,
"name": "foo",
"type": [
"null",
{
"connect.name": "org.apache.kafka.connect.data.Date",
"connect.version": 1,
"logicalType": "date",
"type": "int"
}
]
}
接收器连接器识别 org.apache.kafka.connect.data.Date
类型并正确插入。