来自 kafka-connect 的 Postgres 复制槽已满

Postgres replication slot from kafka-connect is filling up

kafka-connector 连接器创建的复制槽已满。

我在 AWS 上有一个 postgres RDS 数据库。我将以下参数组选项放在上面(仅显示与默认值的差异)

rds.logical_replication: 1

我有 kafka 连接 运行 一个 debezium postgres 连接器。这是配置(当然,某些值已编辑)

"database.dbname"        = "mydb"
"database.hostname"      = "myhostname"
"database.password"      = "mypass"
"database.port"          = "myport"
"database.server.name"   = "postgres"
"database.user"          = "myuser"
"database.whitelist"     = "my_database"
"include.schema.changes" = "false"
"plugin.name"            = "wal2json_streaming"
"slot.name"              = "my_slotname"
"snapshot.mode"          = "never"
"table.whitelist"        = "public.mytable"
"tombstones.on.delete"   = "false"
"transforms"             = "key"
"transforms.key.field"   = "id"
"transforms.key.type"    = "org.apache.kafka.connect.transforms.ExtractField$Key"

如果我得到这个连接器的状态,它似乎没问题。

curl -s http://my.kafkaconnect.url:kc_port/connectors/my-connector/status | jq

{
  "name": "my-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "some_ip"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "some_ip"
    }
  ],
  "type": "source"
}

然而,postgres 中的复制槽越来越大:

SELECT slot_name,
  pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as replicationSlotLag,
  pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) as confirmedLag,
  active
FROM pg_replication_slots;
           slot_name           | replicationslotlag | confirmedlag | active
-------------------------------+--------------------+--------------+--------
 my_slotname                   | 20 GB              | 20 GB        | t

为什么复制不断增长?据我了解,运行 的 kafka connect 连接器任务应该从这个复制槽中读取,将其发布到主题 postgres. public.mytable,然后复制槽的大小应该减小。我是否在这一系列操作中遗漏了什么?

请看WAL Diskspace Consumption

PostgreSQL WAL 积压的最常见原因是连接器正在监视数据库,或者与环境中的其他表或数据库相比,数据库中的表子集更改频率更低,因此连接器不可用足够频繁地确认 LSN 以避免 WAL 积压。

对于 Debezium 1.0.x 及更早版本,启用 heartbeat.interval.ms
对于 Debezium 1.1.0 及之后的版本,还可以考虑启用 heartbeat.action.query

在 Gunnar 提到的地方找到了这个 google group discussion -

core heartbeat feature regularly emits messages to the heartbeat topic, allowing to acknowledge processed WAL offsets also in case only events in filtered tables occur (this is what you observe). Heartbeat action queries(需要 table 并包含在发布中)对于解决多个数据库的情况很有用,在这种情况下,连接器从一个数据库接收更改,否则 no/low流量,在这种情况下再次允许确认偏移量。

- 冈纳尔

在小组讨论中,他提到我们必须将此心跳 table 添加到发布中才能使此心跳查询起作用。这应该有所帮助。