如何告诉 debezuim Mysql 源连接器停止重新获取 kafka 主题中现有表的快照?

How to tell debezuim Mysql source connector to stop retaking snapshots of existing tables in kafka topic?

我正在使用 Debezium MySQL CDC 源连接器将数据库从 mysql 移动到 Kafka。连接器工作正常,除了它表现怪异的快照;连接器成功拍摄了第一个快照,然后在几个小时后因某些堆内存限制而下降(这不是问题)。我暂停了连接器,停止了集群上的工作人员,解决了问题,然后再次启动了工作人员...连接器现在 运行 正常,但再次拍摄快照! 看起来连接器没有从它停止的地方恢复。我认为我的配置有问题。 我正在使用 debezium 0.95。

我将 snapshot.mode=initial 更改为 initial_only,但没有用。

连接属性:

{
  "properties": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "snapshot.locking.mode": "minimal",
    "errors.log.include.messages": "false",
    "table.blacklist": "mydb.someTable",
    "include.schema.changes": "true",
    "database.jdbc.driver": "com.mysql.cj.jdbc.Driver",
    "database.history.kafka.recovery.poll.interval.ms": "100",
    "poll.interval.ms": "500",
    "heartbeat.topics.prefix": "__debezium-heartbeat",
    "binlog.buffer.size": "0",
    "errors.log.enable": "false",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "snapshot.fetch.size": "100000",
    "errors.retry.timeout": "0",
    "database.user": "kafka_readonly",
    "database.history.kafka.bootstrap.servers": "bootstrap:9092",
    "internal.database.history.ddl.filter": "DROP TEMPORARY TABLE IF EXISTS .+ /\* generated by server \*/,INSERT INTO mysql.rds_heartbeat2\(.*\) values \(.*\) ON DUPLICATE KEY UPDATE value \u003d .*,FLUSH RELAY LOGS.*,flush relay logs.*",
    "heartbeat.interval.ms": "0",
    "header.converter": "org.apache.kafka.connect.json.JsonConverter",
    "autoReconnect": "true",
    "inconsistent.schema.handling.mode": "fail",
    "enable.time.adjuster": "true",
    "gtid.new.channel.position": "latest",
    "ddl.parser.mode": "antlr",
    "database.password": "pw",
    "name": "mysql-cdc-replication",
    "errors.tolerance": "none",
    "database.history.store.only.monitored.tables.ddl": "false",
    "gtid.source.filter.dml.events": "true",
    "max.batch.size": "2048",
    "connect.keep.alive": "true",
    "database.history": "io.debezium.relational.history.KafkaDatabaseHistory",
    "snapshot.mode": "initial_only",
    "connect.timeout.ms": "30000",
    "max.queue.size": "8192",
    "tasks.max": "1",
    "database.history.kafka.topic": "history-topic",
    "snapshot.delay.ms": "0",
    "database.history.kafka.recovery.attempts": "100",
    "tombstones.on.delete": "true",
    "decimal.handling.mode": "double",
    "snapshot.new.tables": "parallel",
    "database.history.skip.unparseable.ddl": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "table.ignore.builtin": "true",
    "database.whitelist": "mydb",
    "bigint.unsigned.handling.mode": "long",
    "database.server.id": "6022",
    "event.deserialization.failure.handling.mode": "fail",
    "time.precision.mode": "adaptive_time_microseconds",
    "errors.retry.delay.max.ms": "60000",
    "database.server.name": "host",
    "database.port": "3306",
    "database.ssl.mode": "disabled",
    "database.serverTimezone": "UTC",
    "task.class": "io.debezium.connector.mysql.MySqlConnectorTask",
    "database.hostname": "host",
    "database.server.id.offset": "10000",
    "connect.keep.alive.interval.ms": "60000",
    "include.query": "false"
  }
}

我可以确认 Gunnar 上面的回答。 运行 在快照过程中遇到一些问题,不得不重新启动整个快照过程。目前,连接器不支持在某个点恢复快照。你的配置对我来说似乎很好。希望这有帮助。