使用 Debezium 捕获数据更改但仅获取更改记录
Capture Data Change with Debezium but only take change records
我有两个数据库(db1 和 db2)。那两个人各有 4 张桌子。 4个表的数据在两个数据库中都是相同的。现在我想 运行 CDC 以这样的方式只获取更改数据并在 db2 中更新。我不想将所有数据从 db1 拉入 kafka 主题;因为这是第一次将所有数据拉入 kafka 主题。 Debezium 源连接器的配置是什么?
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '
{
"name": "mysql5-source",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "10",
"database.hostname": "host",
"database.port": "3307",
"database.user": "root",
"database.password": "secret",
"database.server.id": "11",
"database.server.name": "dbserver",
"database.whitelist": "dbname",
"table.whitelist": "dbname.exm1,dbname.exm4,dbname.exm2,dbname.exm3",
"database.history.kafka.bootstrap.servers": "kafka:29092",
"database.history.kafka.topic": "mysql5table",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter.schemas.enable":true,
"value.converter.schemas.enable":true,
"transforms": "unwrap,dropTopicPrefix,pushed_on,first_transfer_date,mem_dob,pushed_date,AL_Date,A_Last_login,live_time,A_Date,callMeDate",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropTopicPrefix.regex":"dbserver.indianmo_imc_new.(.*)",
"transforms.dropTopicPrefix.replacement":""
}
}'
我现在正在使用这个连接器。但它是第一次提取所有数据。我只需要新记录。提前致谢!
我想你要找的是
snapshot.mode=schema_only
https://debezium.io/documentation/reference/1.4/connectors/mysql.html#mysql-connector-properties
我有两个数据库(db1 和 db2)。那两个人各有 4 张桌子。 4个表的数据在两个数据库中都是相同的。现在我想 运行 CDC 以这样的方式只获取更改数据并在 db2 中更新。我不想将所有数据从 db1 拉入 kafka 主题;因为这是第一次将所有数据拉入 kafka 主题。 Debezium 源连接器的配置是什么?
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '
{
"name": "mysql5-source",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "10",
"database.hostname": "host",
"database.port": "3307",
"database.user": "root",
"database.password": "secret",
"database.server.id": "11",
"database.server.name": "dbserver",
"database.whitelist": "dbname",
"table.whitelist": "dbname.exm1,dbname.exm4,dbname.exm2,dbname.exm3",
"database.history.kafka.bootstrap.servers": "kafka:29092",
"database.history.kafka.topic": "mysql5table",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter.schemas.enable":true,
"value.converter.schemas.enable":true,
"transforms": "unwrap,dropTopicPrefix,pushed_on,first_transfer_date,mem_dob,pushed_date,AL_Date,A_Last_login,live_time,A_Date,callMeDate",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropTopicPrefix.regex":"dbserver.indianmo_imc_new.(.*)",
"transforms.dropTopicPrefix.replacement":""
}
}'
我现在正在使用这个连接器。但它是第一次提取所有数据。我只需要新记录。提前致谢!
我想你要找的是
snapshot.mode=schema_only
https://debezium.io/documentation/reference/1.4/connectors/mysql.html#mysql-connector-properties