MySQL 使用 Debezium 更改 Kafka - 仅捕获 DDL stmts
MySQL changes to Kafka Using Debezium - Capturing only DDL stmts
为 CDC 配置了 MySQL-Debezium。它捕获 DDL 更改,如 create/Drop table,但不捕获 DML 事件。
使用 MySQL 8.0.11 和嵌入式 debezium 版本 0.8.3.Final。
创建 table 时未在 MySQL 服务器中进行其他配置。
配置 bean 使用以下代码创建
@Bean
public io.debezium.config.Configuration customerConnector() {
return io.debezium.config.Configuration.create()
.with(EmbeddedEngine.CONNECTOR_CLASS, "io.debezium.connector.mysql.MySqlConnector")
.with(EmbeddedEngine.OFFSET_STORAGE, "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with(EmbeddedEngine.OFFSET_STORAGE_FILE_FILENAME, "path-to-file")
.with("offset.flush.interval.ms", 60000)
.with(EmbeddedEngine.ENGINE_NAME, "customer-mysql-connector")
.with(MySqlConnectorConfig.SERVER_NAME, databaseServer)
.with(MySqlConnectorConfig.HOSTNAME, databaseServer)
.with(MySqlConnectorConfig.PORT, databasePort)
.with(MySqlConnectorConfig.USER, databaseUser)
.with(MySqlConnectorConfig.PASSWORD, databasePassword)
.with(MySqlConnectorConfig.DATABASE_WHITELIST, databaseSchemaName)
.with(MySqlConnectorConfig.TABLE_WHITELIST, databaseTable)
.with(MySqlConnectorConfig.DATABASE_HISTORY,
MemoryDatabaseHistory.class.getName()).build();
}
以下为Springboot应用启动时的日志
2020-05-29 21:24:28.028 INFO 5576 --- [pool-1-thread-1] i.d.connector.mysql.MySqlConnectorTask : MySQL has the binlog file 'binlog.000009' required by the connector
2020-05-29 21:24:28.072 INFO 5576 --- [pool-1-thread-1] io.debezium.util.Threads : Requested thread factory for connector MySqlConnector, id = localhost named = binlog-client
2020-05-29 21:24:28.074 INFO 5576 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2020-05-29 21:24:28.074 INFO 5576 --- [pool-1-thread-1] io.debezium.util.Threads : Creating thread debezium-mysqlconnector-localhost-binlog-client
2020-05-29 21:24:28.090 INFO 5576 --- [-localhost:3306] io.debezium.util.Threads : Creating thread debezium-mysqlconnector-localhost-binlog-client
2020-05-29 21:24:28.121 INFO 5576 --- [-localhost:3306] c.g.shyiko.mysql.binlog.BinaryLogClient : Connected to localhost:3306 at binlog.000009/3786 (sid:6293, cid:36)
2020-05-29 21:24:28.121 INFO 5576 --- [-localhost:3306] i.debezium.connector.mysql.BinlogReader : Connected to MySQL binlog at localhost:3306, starting at binlog file 'binlog.000009', pos=3786, skipping 8 events plus 0 rows
2020-05-29 21:24:28.121 INFO 5576 --- [-localhost:3306] io.debezium.util.Threads : Creating thread debezium-mysqlconnector-localhost-binlog-client
2020-05-29 21:24:28.183 INFO 5576 --- [ main] d.s.w.p.DocumentationPluginsBootstrapper : Context refreshed
2020-05-29 21:24:28.199 INFO 5576 --- [ main] d.s.w.p.DocumentationPluginsBootstrapper : Found 1 custom documentation plugin(s)
2020-05-29 21:24:28.199 INFO 5576 --- [ main] s.d.s.w.s.ApiListingReferenceScanner : Scanning for api listing references
有什么线索吗?
谢谢!
table.whitelist
应该设置为 <schema>.<table>
所以在你的情况下 source.customer
为 CDC 配置了 MySQL-Debezium。它捕获 DDL 更改,如 create/Drop table,但不捕获 DML 事件。 使用 MySQL 8.0.11 和嵌入式 debezium 版本 0.8.3.Final。
创建 table 时未在 MySQL 服务器中进行其他配置。
配置 bean 使用以下代码创建
@Bean
public io.debezium.config.Configuration customerConnector() {
return io.debezium.config.Configuration.create()
.with(EmbeddedEngine.CONNECTOR_CLASS, "io.debezium.connector.mysql.MySqlConnector")
.with(EmbeddedEngine.OFFSET_STORAGE, "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with(EmbeddedEngine.OFFSET_STORAGE_FILE_FILENAME, "path-to-file")
.with("offset.flush.interval.ms", 60000)
.with(EmbeddedEngine.ENGINE_NAME, "customer-mysql-connector")
.with(MySqlConnectorConfig.SERVER_NAME, databaseServer)
.with(MySqlConnectorConfig.HOSTNAME, databaseServer)
.with(MySqlConnectorConfig.PORT, databasePort)
.with(MySqlConnectorConfig.USER, databaseUser)
.with(MySqlConnectorConfig.PASSWORD, databasePassword)
.with(MySqlConnectorConfig.DATABASE_WHITELIST, databaseSchemaName)
.with(MySqlConnectorConfig.TABLE_WHITELIST, databaseTable)
.with(MySqlConnectorConfig.DATABASE_HISTORY,
MemoryDatabaseHistory.class.getName()).build();
}
以下为Springboot应用启动时的日志
2020-05-29 21:24:28.028 INFO 5576 --- [pool-1-thread-1] i.d.connector.mysql.MySqlConnectorTask : MySQL has the binlog file 'binlog.000009' required by the connector
2020-05-29 21:24:28.072 INFO 5576 --- [pool-1-thread-1] io.debezium.util.Threads : Requested thread factory for connector MySqlConnector, id = localhost named = binlog-client
2020-05-29 21:24:28.074 INFO 5576 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2020-05-29 21:24:28.074 INFO 5576 --- [pool-1-thread-1] io.debezium.util.Threads : Creating thread debezium-mysqlconnector-localhost-binlog-client
2020-05-29 21:24:28.090 INFO 5576 --- [-localhost:3306] io.debezium.util.Threads : Creating thread debezium-mysqlconnector-localhost-binlog-client
2020-05-29 21:24:28.121 INFO 5576 --- [-localhost:3306] c.g.shyiko.mysql.binlog.BinaryLogClient : Connected to localhost:3306 at binlog.000009/3786 (sid:6293, cid:36)
2020-05-29 21:24:28.121 INFO 5576 --- [-localhost:3306] i.debezium.connector.mysql.BinlogReader : Connected to MySQL binlog at localhost:3306, starting at binlog file 'binlog.000009', pos=3786, skipping 8 events plus 0 rows
2020-05-29 21:24:28.121 INFO 5576 --- [-localhost:3306] io.debezium.util.Threads : Creating thread debezium-mysqlconnector-localhost-binlog-client
2020-05-29 21:24:28.183 INFO 5576 --- [ main] d.s.w.p.DocumentationPluginsBootstrapper : Context refreshed
2020-05-29 21:24:28.199 INFO 5576 --- [ main] d.s.w.p.DocumentationPluginsBootstrapper : Found 1 custom documentation plugin(s)
2020-05-29 21:24:28.199 INFO 5576 --- [ main] s.d.s.w.s.ApiListingReferenceScanner : Scanning for api listing references
有什么线索吗? 谢谢!
table.whitelist
应该设置为 <schema>.<table>
所以在你的情况下 source.customer