org.apache.kafka.connect.transforms.ReplaceField 无效

org.apache.kafka.connect.transforms.ReplaceField does not work

我使用的文档:https://docs.confluent.io/platform/current/connect/transforms/replacefield.html

我使用此连接器通过使用 org.apache.kafka.connect.transforms.ReplaceField 并将重命名设置为 PersonId:Id

将 PersonId 列重命名为 Id
{
    "name": "SQL_Connector", 
    "config": {
        "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", 
        "tasks.max": "1",
        "database.hostname": "hostname", 
        "database.port": "1433", 
        "database.user": "user", 
        "database.password": "password", 
        "database.dbname": "sqlserver", 
        "database.server.name": "server",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "dbhistory.test",

        "transforms": "RenameField,addStaticField",

        "transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
        "transforms.RenameField.renames": "PersonId:Id",

        "transforms.addStaticField.type":"org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.addStaticField.static.field":"table",
        "transforms.addStaticField.static.value":"changedtablename",
    }
}

但是当我在主题中获取值时,字段 PersonId 没有改变:

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "PersonId"
                    }
                ],
                "optional": true,
                "name": "test.Value",
                "field": "before"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "PersonId"
                    }
                ],
                "optional": true,
                "name": "test.Value",
                "field": "after"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "version"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "connector"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_ms"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "name": "io.debezium.data.Enum",
                        "version": 1,
                        "parameters": {
                            "allowed": "true,last,false"
                        },
                        "default": "false",
                        "field": "snapshot"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "db"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "schema"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "table"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "change_lsn"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "commit_lsn"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "event_serial_no"
                    }
                ],
                "optional": false,
                "name": "io.debezium.connector.sqlserver.Source",
                "field": "source"
            },
            {
                "type": "string",
                "optional": false,
                "field": "op"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ms"
            },
            {
                "type": "string",
                "optional": true,
                "field": "table"
            }
        ],
        "optional": false,
        "name": "test.Envelope"
    },
    "payload": {
        "before": null,
        "after": {
            "PersonId": 1,
        },
        "source": {
            "version": "1.0.3.Final",
            "connector": "sqlserver",
            "name": "test",
            "ts_ms": 1627628793596,
            "snapshot": "true",
            "db": "test",
            "schema": "dbo",
            "table": "TestTable",
            "change_lsn": null,
            "commit_lsn": "00023472:00000100:0001",
            "event_serial_no": null
        },
        "op": "r",
        "ts_ms": 1627628793596,
        "table": "changedtablename"
    }
}

如何更改字段?

您只能替换位于 Kafka 记录顶层的字段,如文档中的示例所示。

也就是说,您需要先 extract the after field