如何在从 debezium kafka connect 收到的 CDC 事件中获取 table-name 和 database-name
How to get the table-name and database-name in the CDC event received from debezium kafka connect
设置: 我在 MS SQL 服务器上启用了 CDC,CDC 事件使用 debezium kafka connect(source) 馈送到 Kafka。此外,不止一个 table CDC 事件被路由到 Kafka 中的单个主题。
问题:由于我在kafka主题中有多个table数据,我想有table名称和数据库名称在 CDC 数据中。
我在 MySQL CDC 中获得了 table 名称和数据库名称,但在 MS SQL CDC 中却没有。
下面是 SQL 服务器的 Debezium 源连接器
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
"name": "cdc-user_profile-connector",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max": "1",
"database.hostname": "<<hostname>>",
"database.port": "<<port>>",
"database.user": "<<username>>",
"database.password": "<<password>>",
"database.server.name": "test",
"database.dbname": "testDb",
"table.whitelist": "schema01.table1,schema01.table2",
"database.history.kafka.bootstrap.servers": "broker:9092",
"database.history.kafka.topic": "digital.user_profile.schema.audit",
"database.history.store.only.monitored.tables.ddl": true,
"include.schema.changes": false,
"event.deserialization.failure.handling.mode": "fail",
"snapshot.mode": "initial_schema_only",
"snapshot.locking.mode": "none",
"transforms":"addStaticField,topicRoute",
"transforms.addStaticField.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addStaticField.static.field":"source_system",
"transforms.addStaticField.static.value":"source_system_1",
"transforms.topicRoute.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.topicRoute.regex":"(.*)",
"transforms.topicRoute.replacement":"digital.user_profile",
"errors.tolerance": "none",
"errors.log.enable": true,
"errors.log.include.messages": true,
"errors.retry.delay.max.ms": 60000,
"errors.retry.timeout": 300000
}
}'
我得到以下输出(演示数据)
{
"before": {
"profile_id": 147,
"email_address": "test@gmail.com"
},
"after": {
"profile_id": 147,
"email_address": "test_modified@gmail.com"
},
"source": {
"version": "0.9.4.Final",
"connector": "sqlserver",
"name": "test",
"ts_ms": 1556723528917,
"change_lsn": "0007cbe5:0000b98c:0002",
"commit_lsn": "0007cbe5:0000b98c:0003",
"snapshot": false
},
"op": "u",
"ts_ms": 1556748731417,
"source_system": "source_system_1"
}
我的要求是得到如下
{
"before": {
"profile_id": 147,
"email_address": "test@gmail.com"
},
"after": {
"profile_id": 147,
"email_address": "test_modified@gmail.com"
},
"source": {
"version": "0.9.4.Final",
"connector": "sqlserver",
"name": "test",
"ts_ms": 1556723528917,
"change_lsn": "0007cbe5:0000b98c:0002",
"commit_lsn": "0007cbe5:0000b98c:0003",
"snapshot": false,
"db": "testDb",
"table": "table1/table2"
},
"op": "u",
"ts_ms": 1556748731417,
"source_system": "source_system_1"
}
Debezium Kafka-Connect 通常将来自每个 table 的数据放在一个单独的主题中,主题名称的格式为 hostname.database.table。我们一般用主题名来区分来源table & 数据库名
如果您将所有 table 中的数据手动放入一个主题中,那么您可能还必须手动添加 table 和数据库名称。
这计划作为 https://issues.jboss.org/browse/DBZ-875 问题的一部分
设置: 我在 MS SQL 服务器上启用了 CDC,CDC 事件使用 debezium kafka connect(source) 馈送到 Kafka。此外,不止一个 table CDC 事件被路由到 Kafka 中的单个主题。
问题:由于我在kafka主题中有多个table数据,我想有table名称和数据库名称在 CDC 数据中。
我在 MySQL CDC 中获得了 table 名称和数据库名称,但在 MS SQL CDC 中却没有。
下面是 SQL 服务器的 Debezium 源连接器
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
"name": "cdc-user_profile-connector",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max": "1",
"database.hostname": "<<hostname>>",
"database.port": "<<port>>",
"database.user": "<<username>>",
"database.password": "<<password>>",
"database.server.name": "test",
"database.dbname": "testDb",
"table.whitelist": "schema01.table1,schema01.table2",
"database.history.kafka.bootstrap.servers": "broker:9092",
"database.history.kafka.topic": "digital.user_profile.schema.audit",
"database.history.store.only.monitored.tables.ddl": true,
"include.schema.changes": false,
"event.deserialization.failure.handling.mode": "fail",
"snapshot.mode": "initial_schema_only",
"snapshot.locking.mode": "none",
"transforms":"addStaticField,topicRoute",
"transforms.addStaticField.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addStaticField.static.field":"source_system",
"transforms.addStaticField.static.value":"source_system_1",
"transforms.topicRoute.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.topicRoute.regex":"(.*)",
"transforms.topicRoute.replacement":"digital.user_profile",
"errors.tolerance": "none",
"errors.log.enable": true,
"errors.log.include.messages": true,
"errors.retry.delay.max.ms": 60000,
"errors.retry.timeout": 300000
}
}'
我得到以下输出(演示数据)
{
"before": {
"profile_id": 147,
"email_address": "test@gmail.com"
},
"after": {
"profile_id": 147,
"email_address": "test_modified@gmail.com"
},
"source": {
"version": "0.9.4.Final",
"connector": "sqlserver",
"name": "test",
"ts_ms": 1556723528917,
"change_lsn": "0007cbe5:0000b98c:0002",
"commit_lsn": "0007cbe5:0000b98c:0003",
"snapshot": false
},
"op": "u",
"ts_ms": 1556748731417,
"source_system": "source_system_1"
}
我的要求是得到如下
{
"before": {
"profile_id": 147,
"email_address": "test@gmail.com"
},
"after": {
"profile_id": 147,
"email_address": "test_modified@gmail.com"
},
"source": {
"version": "0.9.4.Final",
"connector": "sqlserver",
"name": "test",
"ts_ms": 1556723528917,
"change_lsn": "0007cbe5:0000b98c:0002",
"commit_lsn": "0007cbe5:0000b98c:0003",
"snapshot": false,
"db": "testDb",
"table": "table1/table2"
},
"op": "u",
"ts_ms": 1556748731417,
"source_system": "source_system_1"
}
Debezium Kafka-Connect 通常将来自每个 table 的数据放在一个单独的主题中,主题名称的格式为 hostname.database.table。我们一般用主题名来区分来源table & 数据库名
如果您将所有 table 中的数据手动放入一个主题中,那么您可能还必须手动添加 table 和数据库名称。
这计划作为 https://issues.jboss.org/browse/DBZ-875 问题的一部分