使用 Debezium 和 Kafka Connect JDBC 接收器连接器同步数据库时如何重命名主键?

How to rename primary key when using Debezium and Kafka Connect JDBC sink connector to synchronize databases?

我正在尝试使用 Debezium 将上游数据库中的 table 同步到下游数据库,遵循 Debezium 博客 here.

中描述的方法

在下游table,我只需要来自上游table的某些列。我还想更改一些列名称(包括主键名称)。 如果我不尝试重命名主键,同步工作没有任何问题。

我正在使用:

我在下面列出了我的数据库和连接器设置的完整详细信息。

(1) 数据库 table 定义:

上游 table 的 DDL 是:

CREATE TABLE [kafkatest.service1].dbo.Users (
    Id int IDENTITY(1,1) NOT NULL,
    Name nvarchar COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
    CONSTRAINT PK_Users PRIMARY KEY (Id)
) GO

下游 table 的 DDL 是:

CREATE TABLE [kafkatest.service2].dbo.Users (
    LocalId int IDENTITY(1,1) NOT NULL, // added to avoid IDENTITY_INSERT issue with SQL Server
    ExternalId int NOT NULL,
    ExternalName nvarchar COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
    CONSTRAINT PK_Users PRIMARY KEY (LocalId)
) GO

特别注意 'Id' 列在 upstream table(这是主要的key) 应映射到 'ExternalId'downstream table.

(2) Kafka 连接器定义:

源连接器:

{
    "name": "users-connector",
  "config": {
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "tasks.max": "1",
    "database.server.name": "sqlserver",
    "database.hostname": "sqlserver",
    "database.port": "1433",
    "database.user": "sa",
    "database.password": "Password!",
    "database.dbname": "kafkatest.service1",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.users",
    "table.whitelist": "dbo.Users"
  }
}

接收器连接器:

{
    "name": "jdbc-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics.regex": "sqlserver\.dbo\.(Users)",
        "connection.url": "jdbc:sqlserver://sqlserver:1433;databaseName=kafkatest.service2",
        "connection.user": "sa",
        "connection.password": "Password!",
        "transforms": "unwrap,route,RenameField",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
        "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.route.regex": "(?:[^.]+)\.(?:[^.]+)\.([^.]+)",
        "transforms.route.replacement": "",
        "transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
        "transforms.RenameField.renames": "Id:ExternalId,Name:ExternalName",
        "auto.create": "false",
        "auto.evolve": "false",
        "insert.mode": "upsert",
        "delete.enabled": "true",
        "pk.fields": "Id",
        "pk.mode": "record_key"
    }
}

据我所知,“pk.mode”需要为“record_key”才能启用删除。我已尝试将“pk.fields”值同时设置为“Id”和“ExternalId”,但均无效。

(3) 错误信息:

在第一种情况下(即“pk.fields”:“Id”)我得到以下错误:

2020-08-18 10:16:16,951 INFO   ||  Unable to find fields [SinkRecordField{schema=Schema{INT32}, name='Id', isPrimaryKey=true}] among column names [ExternalId, ExternalName, LocalId]   [io.confluent.connect.jdbc.sink.DbStructure]
2020-08-18 10:16:16,952 ERROR  ||  WorkerSinkTask{id=jdbc-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Cannot ALTER TABLE "Users" to add missing field SinkRecordField{schema=Schema{INT32}, name='Id', isPrimaryKey=true}, as the field is not optional and does not have a default value   [org.apache.kafka.connect.runtime.WorkerSinkTask]
org.apache.kafka.connect.errors.ConnectException: Cannot ALTER TABLE "Users" to add missing field SinkRecordField{schema=Schema{INT32}, name='Id', isPrimaryKey=true}, as the field is not optional and does not have a default value

在第二种情况下(即“pk.fields”:“ExternalId”)我得到以下错误:

2020-08-18 10:17:50,192 ERROR  ||  WorkerSinkTask{id=jdbc-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: PK mode for table 'Users' is RECORD_KEY with configured PK fields [ExternalId], but record key schema does not contain field: ExternalId   [org.apache.kafka.connect.runtime.WorkerSinkTask]
org.apache.kafka.connect.errors.ConnectException: PK mode for table 'Users' is RECORD_KEY with configured PK fields [ExternalId], but record key schema does not contain field: ExternalId

使用 Debezium 时是否可以重命名主键?或者我是否总是需要构建我的数据库 tables 以便主键名称在上游和下游数据库中匹配?

尝试重命名关键字段:

"transforms": "unwrap,route,RenameField,RenameKey",
...
"transforms.RenameKey.type": "org.apache.kafka.connect.transforms.ReplaceField$Key",
"transforms.RenameKey.renames": "Id:ExternalId",

当您使用"pk.mode": "record_key"时,主键from the message key are used to build the upsert query statement