使用 table 白名单选项更新 Debezium MySQL 连接器

Updating a Debezium MySQL connector with table whitelist option

我正在使用 Debezium (0.7.5) MySQL 连接器,我试图了解如果我想使用选项 table.whitelist 更新此配置,最好的方法是什么。

假设我创建了一个连接器,如下所示:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://debezium-host/connectors/ -d '
{
  "name": "MyConnector",
  "config": {
      "connector.class": "io.debezium.connector.mysql.MySqlConnector",
      "connect.timeout.ms": "60000",
      "tasks.max": "1",
      "database.hostname": "myhost",
      "database.port": "3306",
      "database.user": "***",
      "database.password": "***",
      "database.server.id": "3227197",
      "database.server.name": "MyServer",
      "database.whitelist": "myDb",
      "table.whitelist": "myDb.table1,myDb.table2",
      "database.history.kafka.bootstrap.servers": "kb0:9092,kb1:9092,kb2:9092",
      "database.history.kafka.topic": "MyConnectorHistoryTopic",
      "max.batch.size": "1024",
      "snapshot.mode": "initial",
      "decimal.handling.mode": "double"
    }
}'

过了一段时间(2 周)后,我需要向这个 table.whitelist 选项添加一个新的 table (myDb.table3)(这个 table 是一个旧的一,它是在连接器之前创建的)

我试过的是:

通过API更新命令:

curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" https://kafka-connect-host/connectors/MyConnector/config/ -d '
{
  "connector.class": "io.debezium.connector.mysql.MySqlConnector",
  "connect.timeout.ms": "60000",
  "tasks.max": "1",
  "database.hostname": "myhost",
  "database.port": "3306",
  "database.user": "***",
  "database.password": "***",
  "database.server.id": "3227197",
  "database.server.name": "MyServer",
  "database.whitelist": "myDb",
  "table.whitelist": "myDb.table1,myDb.table2,myDb.table3",
  "database.history.kafka.bootstrap.servers": "kb0:9092,kb1:9092,kb2:9092",
  "database.history.kafka.topic": "MyConnectorHistoryTopic",
  "max.batch.size": "1024",
  "snapshot.mode": "schema_only",
  "decimal.handling.mode": "double"
}'

但它没有用,也许这根本不是最好的方法。 在其他连接器中,我没有使用选项 table.whitelist,所以当我需要收听新的 table 时,我没有遇到这个问题。

我认为我的最后一个选择是删除此连接器并使用此新配置创建另一个同时监听新 table (myDb.table3) 的连接器。问题是,如果我想要来自 myDb.table3 的初始数据,我将不得不使用快照 initial 创建,但我不想从另一个 table 的快照生成所有消息s myDb.table1,myDb.table2

目前尚不支持对 whitelist/blacklist 配置的更改。目前正在处理此问题(请参阅 DBZ-175), and we hope to have preview support for this in one of the next releases. There's a pending PR,不过还需要做更多的工作。

在此之前,您最好的选择是设置一个新的连接器实例,捕获您感兴趣的其他表。这是在运行 两个连接器的价格(它们都将维护一个二进制日志 reader 会话),但只要您不需要经常更改过滤器配置,它就可以解决问题。

Debezium Server最新版本,可以添加如下配置

debezium.snapshot.new.tables=parallel

如果你使用的是 Debezium,你可以尝试这个配置值

snapshot.new.tables=parallel

注意:Debeziyum 服务器是支持 Kinesis、GooglePub sub 和 Apache Pulsar 的服务器。我正在使用它,它的配置有点不同。我必须在每个项目前添加“debezium”

添加此配置后,对 tables.whitelist 的任何添加,对于这些额外的表,Debezium 都将创建快照。

我无法向您指出文档,但我在 GitHub 中浏览了他们的代码,而且我还实际尝试了它,这对我有用。 这是 link 到 MySqlConnector 代码

https://github.com/debezium/debezium/blob/master/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java

有搜索 Field.create("snapshot.new.tables")

个人感觉Debezium东西很多,但是文档比较散乱

我有同样的问题,并通过向 debezium 发送信号 table 来解决。它是这样工作的,你必须创建一个 table 来发送数据中的 debezium 命令 table.

CREATE TABLE public.debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32)  NULL, data VARCHAR(2048)  NULL);

并在您的配置中设置 debzium a 标签 "signal.data.collection": "public.debezium_signal"

之后你可以发送带有插入的命令 table:

INSERT INTO debezium_signal (id, type, data)
VALUES(gen_random_uuid(),'execute-snapshot','{"data-collections": "myDb.table3"]}');

在我的例子中,我必须在 table.include.list 和 column.include.list 中添加 de table 信号。

https://debezium.io/documentation/reference/stable/configuration/signalling.html