org.apache.kafka.connect.transforms.ReplaceField 无效
org.apache.kafka.connect.transforms.ReplaceField does not work
我使用的文档:https://docs.confluent.io/platform/current/connect/transforms/replacefield.html
我使用此连接器通过使用 org.apache.kafka.connect.transforms.ReplaceField 并将重命名设置为 PersonId:Id
将 PersonId 列重命名为 Id
{
"name": "SQL_Connector",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max": "1",
"database.hostname": "hostname",
"database.port": "1433",
"database.user": "user",
"database.password": "password",
"database.dbname": "sqlserver",
"database.server.name": "server",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.test",
"transforms": "RenameField,addStaticField",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "PersonId:Id",
"transforms.addStaticField.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addStaticField.static.field":"table",
"transforms.addStaticField.static.value":"changedtablename",
}
}
但是当我在主题中获取值时,字段 PersonId 没有改变:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "PersonId"
}
],
"optional": true,
"name": "test.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "PersonId"
}
],
"optional": true,
"name": "test.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "string",
"optional": true,
"field": "change_lsn"
},
{
"type": "string",
"optional": true,
"field": "commit_lsn"
},
{
"type": "int64",
"optional": true,
"field": "event_serial_no"
}
],
"optional": false,
"name": "io.debezium.connector.sqlserver.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"field": "table"
}
],
"optional": false,
"name": "test.Envelope"
},
"payload": {
"before": null,
"after": {
"PersonId": 1,
},
"source": {
"version": "1.0.3.Final",
"connector": "sqlserver",
"name": "test",
"ts_ms": 1627628793596,
"snapshot": "true",
"db": "test",
"schema": "dbo",
"table": "TestTable",
"change_lsn": null,
"commit_lsn": "00023472:00000100:0001",
"event_serial_no": null
},
"op": "r",
"ts_ms": 1627628793596,
"table": "changedtablename"
}
}
如何更改字段?
您只能替换位于 Kafka 记录顶层的字段,如文档中的示例所示。
也就是说,您需要先 extract the after
field
我使用的文档:https://docs.confluent.io/platform/current/connect/transforms/replacefield.html
我使用此连接器通过使用 org.apache.kafka.connect.transforms.ReplaceField 并将重命名设置为 PersonId:Id
将 PersonId 列重命名为 Id{
"name": "SQL_Connector",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max": "1",
"database.hostname": "hostname",
"database.port": "1433",
"database.user": "user",
"database.password": "password",
"database.dbname": "sqlserver",
"database.server.name": "server",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.test",
"transforms": "RenameField,addStaticField",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "PersonId:Id",
"transforms.addStaticField.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addStaticField.static.field":"table",
"transforms.addStaticField.static.value":"changedtablename",
}
}
但是当我在主题中获取值时,字段 PersonId 没有改变:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "PersonId"
}
],
"optional": true,
"name": "test.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "PersonId"
}
],
"optional": true,
"name": "test.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "string",
"optional": true,
"field": "change_lsn"
},
{
"type": "string",
"optional": true,
"field": "commit_lsn"
},
{
"type": "int64",
"optional": true,
"field": "event_serial_no"
}
],
"optional": false,
"name": "io.debezium.connector.sqlserver.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"field": "table"
}
],
"optional": false,
"name": "test.Envelope"
},
"payload": {
"before": null,
"after": {
"PersonId": 1,
},
"source": {
"version": "1.0.3.Final",
"connector": "sqlserver",
"name": "test",
"ts_ms": 1627628793596,
"snapshot": "true",
"db": "test",
"schema": "dbo",
"table": "TestTable",
"change_lsn": null,
"commit_lsn": "00023472:00000100:0001",
"event_serial_no": null
},
"op": "r",
"ts_ms": 1627628793596,
"table": "changedtablename"
}
}
如何更改字段?
您只能替换位于 Kafka 记录顶层的字段,如文档中的示例所示。
也就是说,您需要先 extract the after
field