使用 Kafka Connect 在 MySQL 个数据库之间同步数据

Sync data between MySQL Databases with Kafka Connect

我正在尝试使用基于 Kafka Connect 的 Confluent 在多个 MySQL 数据库之间同步数据。我在源连接器配置中使用 "bulk" 作为模式,因为主键类型是 varchar,所以我不能使用递增模式。它工作正常,但我遇到了两个问题:

  1. 似乎无法同步删除,在源数据库中删除数据时,接收器数据库没有任何反应。数据仍然存在于接收器数据库中。
  2. 同步数据需要一段时间。就我而言,将 table 与 3~4k 行同步大约需要 2~4 分钟。我可以理解使用批量模式可能会花费更多时间来同步数据,但这不会太长吗?

这是我的源连接器配置:

name=test-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://xxx.xxx.xxx:3306/xxx?useUnicode=true&characterEncoding=utf8
connection.user=user
connection.password=password
mode=bulk
table.whitelist=a_table

这是我的接收器连接器配置:

name=test-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1 topics=a_table
connection.url=jdbc:mysql://xxx.xxx.xxx.xxx:3306/xxx?useUnicode=true&characterEncoding=utf8
connection.user=user
connection.password=password
insert.mode=upsert
pk.mode=record_value
pk.fields=mypk
auto.evolve=true

如有任何建议,我们将不胜感激。谢谢。

  1. 如果要同步删除,则需要使用CDC,例如Debezium。 JDBC 连接器只能检测存在的记录,不能检测不存在的记录。

  2. CDC 也比批量提取更有效,因为它会监控 MySQL 事务日志以查找所需 table 上的任何事务。

  3. 你的主键是VARCHAR?哇。如果您不想使用 CDC,我建议您使用基于 INT 的密钥,然后使用 JDBC 连接器进行增量加载。那,或者将时间戳列添加到 table,并将其用于增量。