Debezium Postgres Kafka 连接器心跳未提交 LSN
Debezium Postgres Kafka Connector heartbeat is not committing LSN
我在 AWS RDS 上有一个 Postgres Db 和一个在 table 上侦听的 kafka 连接器 (Debezium Postgres)。连接器的配置:
{
"name": "my-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.dbname": "my_db",
"database.user": "my_user",
"max.queue.size": "32000",
"slot.name": "my_slot",
"tasks.max": "1",
"publication.name": "my_publication",
"database.server.name": "postgres",
"heartbeat.interval.ms": "1000",
"database.port": "my_port",
"include.schema.changes": "false",
"plugin.name": "pgoutput",
"table.whitelist": "public.my_table",
"tombstones.on.delete": "false",
"database.hostname": "my_host",
"database.password": "my_password",
"name": "my-connector",
"max.batch.size": "10000",
"database.whitelist": "my_db",
"snapshot.mode": "never"
},
"tasks": [
{
"connector": "my-connector",
"task": 0
}
],
"type": "source"
}
table 不像其他 table 那样频繁更新,这最初导致复制延迟如下:
SELECT slot_name,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as replicationSlotLag,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) as confirmedLag,
active
FROM pg_replication_slots;
slot_name | replicationslotlag | confirmedlag | active
-------------------------------+--------------------+--------------+--------
my_slot | 1664 MB | 1664 MB | t
它会变得如此之大,以至于可能会耗尽所有磁盘 space。
我添加了心跳,如果我登录到 kafka 代理并像这样设置控制台消费者:./kafka-console-consumer.sh --bootstrap-server my.broker.address:9092 --topic __debezium-heartbeat.postgres --from-beginning --consumer.config=/etc/kafka/consumer.properties
它会转储所有心跳消息,然后每 1000 毫秒显示一个新消息。
但是,插槽的大小仍在不断增长。如果我做一些事情,比如在 table 中插入一条虚拟记录,它会将插槽设置回一个小的滞后,这样就可以了。
虽然我很想用心跳来做这件事。我不想插入定期消息,因为这听起来会增加复杂性。为什么心跳不减小插槽大小?
我在 AWS RDS 上有一个 Postgres Db 和一个在 table 上侦听的 kafka 连接器 (Debezium Postgres)。连接器的配置:
{
"name": "my-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.dbname": "my_db",
"database.user": "my_user",
"max.queue.size": "32000",
"slot.name": "my_slot",
"tasks.max": "1",
"publication.name": "my_publication",
"database.server.name": "postgres",
"heartbeat.interval.ms": "1000",
"database.port": "my_port",
"include.schema.changes": "false",
"plugin.name": "pgoutput",
"table.whitelist": "public.my_table",
"tombstones.on.delete": "false",
"database.hostname": "my_host",
"database.password": "my_password",
"name": "my-connector",
"max.batch.size": "10000",
"database.whitelist": "my_db",
"snapshot.mode": "never"
},
"tasks": [
{
"connector": "my-connector",
"task": 0
}
],
"type": "source"
}
table 不像其他 table 那样频繁更新,这最初导致复制延迟如下:
SELECT slot_name,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as replicationSlotLag,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) as confirmedLag,
active
FROM pg_replication_slots;
slot_name | replicationslotlag | confirmedlag | active
-------------------------------+--------------------+--------------+--------
my_slot | 1664 MB | 1664 MB | t
它会变得如此之大,以至于可能会耗尽所有磁盘 space。
我添加了心跳,如果我登录到 kafka 代理并像这样设置控制台消费者:./kafka-console-consumer.sh --bootstrap-server my.broker.address:9092 --topic __debezium-heartbeat.postgres --from-beginning --consumer.config=/etc/kafka/consumer.properties
它会转储所有心跳消息,然后每 1000 毫秒显示一个新消息。
但是,插槽的大小仍在不断增长。如果我做一些事情,比如在 table 中插入一条虚拟记录,它会将插槽设置回一个小的滞后,这样就可以了。
虽然我很想用心跳来做这件事。我不想插入定期消息,因为这听起来会增加复杂性。为什么心跳不减小插槽大小?