Postgres Debezium 不发布记录的先前状态
Postgres Debezium does not publish the previous state of a record
我成功安装了Postgres Debezium CDC
。现在,我能够捕捉到数据库发生的所有变化。但问题是 "before" 字段始终为空。所以,如果我插入一条记录 (id = 1, name = Bill)
然后我从 Kafka 得到这个数据:
'payload': {'before': None, 'after': {'id': 1, 'name': 'Bill'}, ...
但是如果我这样更新记录:
UPDATE mytable set name = 'Bob' WHERE id = 1
我从卡夫卡得到这个:
'payload': {'before': None, 'after': {'id': 1, 'name': 'Bob'}, ...
这是我配置连接器的方式:
curl -X POST localhost:8083/connectors/ \
-H "Accept:application/json" -H "Content-Type:application/json" -d \
'{
"name": "test-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"plugin.name": "pgoutput",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "test",
"database.server.name": "postgres",
"database.whitelist": "public.mytable",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "public.topic"
}
}'
这有什么问题,我该如何解决?
before
是可选字段,如果存在,则包含事件发生前行的状态。此字段是否可用在很大程度上取决于每个 table 的 REPLICA IDENTITY
设置。
REPLICA IDENTITY
是 PostgreSQL 特定的 table 级别设置,它确定在 UPDATE
和 DELETE
事件的情况下可用于逻辑解码的信息量。
要显示所有 table 列的先前值,请将 REPLICA IDENTITY
级别设置为 FULL
:
ALTER TABLE public.mytable REPLICA IDENTITY FULL;
在 Debezium docs 中查看更多详细信息。
我成功安装了Postgres Debezium CDC
。现在,我能够捕捉到数据库发生的所有变化。但问题是 "before" 字段始终为空。所以,如果我插入一条记录 (id = 1, name = Bill)
然后我从 Kafka 得到这个数据:
'payload': {'before': None, 'after': {'id': 1, 'name': 'Bill'}, ...
但是如果我这样更新记录:
UPDATE mytable set name = 'Bob' WHERE id = 1
我从卡夫卡得到这个:
'payload': {'before': None, 'after': {'id': 1, 'name': 'Bob'}, ...
这是我配置连接器的方式:
curl -X POST localhost:8083/connectors/ \
-H "Accept:application/json" -H "Content-Type:application/json" -d \
'{
"name": "test-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"plugin.name": "pgoutput",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "test",
"database.server.name": "postgres",
"database.whitelist": "public.mytable",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "public.topic"
}
}'
这有什么问题,我该如何解决?
before
是可选字段,如果存在,则包含事件发生前行的状态。此字段是否可用在很大程度上取决于每个 table 的 REPLICA IDENTITY
设置。
REPLICA IDENTITY
是 PostgreSQL 特定的 table 级别设置,它确定在 UPDATE
和 DELETE
事件的情况下可用于逻辑解码的信息量。
要显示所有 table 列的先前值,请将 REPLICA IDENTITY
级别设置为 FULL
:
ALTER TABLE public.mytable REPLICA IDENTITY FULL;
在 Debezium docs 中查看更多详细信息。