使用 psycopg2 和发布者进行逻辑复制
Logical replication with psycopg2 and publishers
有写在python和psycopg2中的逻辑复制脚本:
https://www.psycopg.org/docs/advanced.html#logical-replication-quick-start
有效。但我不想接收数据库中的所有操作,而只想接收发布者中指定的操作。
为此:
我在源数据库(版本 13.3)中创建了名为“PUB”和一个 table 的发布者。我还部署了第二个数据库(版本 14.1)并在其中创建了对此出版物的订阅,以检查该出版物的功能。并且有效。
在 psycopg2 文档 (https://www.psycopg.org/docs/extras.html#replication-support-objects) and postgresql documentation (https://postgrespro.ru/docs/postgresql/13/protocol-logical-replication?lang=en) 中,我找到了一种在 start_replication
中设置 publication_names 的方法
cur.start_replication(slot_name='pytest', decode=True, options={'publication_names': 'PUB'})
但是没有用。我收到错误:
psycopg2.errors.InvalidParameterValue: option "publication_names" = "PUB," is unknown
原来是这个问题。 psycorpg2 中的逻辑复制可以与订阅相关联吗?如果是,怎么做?
P.s。有替代方案(Trigger/Notify -> Listen/psycopg2),但我想尽可能解决这个问题。
test_decoding 插件不支持 publication_names 选项。
内置PUBLICATION使用的插件叫'pgoutput',不是'test_decoding'。但据我所知,psycopg2 不支持使用该插件。
所以你想要的不被支持。我不知道是否有人正在努力增加对它的支持。
你的问题的答案可能是“是和否”。
作为jjanes states in their ,您必须使用pgoutput
输出插件才能使用出版物。您可以配置 psycopg2
以像这样使用它:
conn = psycopg2.connect(
dbname='test',
connection_factory=psycopg2.extras.LogicalReplicationConnection,
)
options = {'publication_names': 'pub', 'proto_version': '1'}
cur = conn.cursor()
try:
cur.start_replication(slot_name='pytest', decode=False, options=options)
except psycopg2.ProgrammingError:
cur.create_replication_slot(
'pytest',
output_plugin='pgoutput',
)
cur.start_replication(slot_name='pytest', decode=False, options=options)
但是pgoutput
使用二进制协议,所以这些语句的输出
insert into table1 (col) values ('hello hello');
update table1 set col = 'hello goodbye' where id = 6;
是
b'B\x00\x00\x00\x00r!<x\x00\x02t\xc65\xbb\xb5\xd3\x00\x00jd'
b'R\x00\x00F:public\x00table1\x00d\x00\x02\x01id\x00\x00\x00\x00\x17\xff\xff\xff\xff\x00col\x00\x00\x00\x04\x13\x00\x00\x00\x14'
b'I\x00\x00F:N\x00\x02t\x00\x00\x00\x016t\x00\x00\x00\x0bhello hello'
b'C\x00\x00\x00\x00\x00r!<x\x00\x00\x00\x00r!<\xa8\x00\x02t\xc65\xbb\xb5\xd3'
b'B\x00\x00\x00\x00r!=8\x00\x02t\xc67`\xb5\xae\x00\x00je'
b'U\x00\x00F:N\x00\x02t\x00\x00\x00\x016t\x00\x00\x00\rhello goodbye'
b'C\x00\x00\x00\x00\x00r!=8\x00\x00\x00\x00r!=h\x00\x02t\xc67`\xb5\xae'
因为psycopg2
不知道如何解码消息。你可以自己解码它们,使用 struct module from Python's standard library and the published message formats, or you could consider using wal2json instead of pgoutput
. wal2json
does not currently support publications, but it does provide the means to emulate 它们。
有写在python和psycopg2中的逻辑复制脚本: https://www.psycopg.org/docs/advanced.html#logical-replication-quick-start
有效。但我不想接收数据库中的所有操作,而只想接收发布者中指定的操作。
为此:
我在源数据库(版本 13.3)中创建了名为“PUB”和一个 table 的发布者。我还部署了第二个数据库(版本 14.1)并在其中创建了对此出版物的订阅,以检查该出版物的功能。并且有效。
在 psycopg2 文档 (https://www.psycopg.org/docs/extras.html#replication-support-objects) and postgresql documentation (https://postgrespro.ru/docs/postgresql/13/protocol-logical-replication?lang=en) 中,我找到了一种在 start_replication
中设置 publication_names 的方法
cur.start_replication(slot_name='pytest', decode=True, options={'publication_names': 'PUB'})
但是没有用。我收到错误:
psycopg2.errors.InvalidParameterValue: option "publication_names" = "PUB," is unknown
原来是这个问题。 psycorpg2 中的逻辑复制可以与订阅相关联吗?如果是,怎么做?
P.s。有替代方案(Trigger/Notify -> Listen/psycopg2),但我想尽可能解决这个问题。
test_decoding 插件不支持 publication_names 选项。
内置PUBLICATION使用的插件叫'pgoutput',不是'test_decoding'。但据我所知,psycopg2 不支持使用该插件。
所以你想要的不被支持。我不知道是否有人正在努力增加对它的支持。
你的问题的答案可能是“是和否”。
作为jjanes states in their pgoutput
输出插件才能使用出版物。您可以配置 psycopg2
以像这样使用它:
conn = psycopg2.connect(
dbname='test',
connection_factory=psycopg2.extras.LogicalReplicationConnection,
)
options = {'publication_names': 'pub', 'proto_version': '1'}
cur = conn.cursor()
try:
cur.start_replication(slot_name='pytest', decode=False, options=options)
except psycopg2.ProgrammingError:
cur.create_replication_slot(
'pytest',
output_plugin='pgoutput',
)
cur.start_replication(slot_name='pytest', decode=False, options=options)
但是pgoutput
使用二进制协议,所以这些语句的输出
insert into table1 (col) values ('hello hello');
update table1 set col = 'hello goodbye' where id = 6;
是
b'B\x00\x00\x00\x00r!<x\x00\x02t\xc65\xbb\xb5\xd3\x00\x00jd'
b'R\x00\x00F:public\x00table1\x00d\x00\x02\x01id\x00\x00\x00\x00\x17\xff\xff\xff\xff\x00col\x00\x00\x00\x04\x13\x00\x00\x00\x14'
b'I\x00\x00F:N\x00\x02t\x00\x00\x00\x016t\x00\x00\x00\x0bhello hello'
b'C\x00\x00\x00\x00\x00r!<x\x00\x00\x00\x00r!<\xa8\x00\x02t\xc65\xbb\xb5\xd3'
b'B\x00\x00\x00\x00r!=8\x00\x02t\xc67`\xb5\xae\x00\x00je'
b'U\x00\x00F:N\x00\x02t\x00\x00\x00\x016t\x00\x00\x00\rhello goodbye'
b'C\x00\x00\x00\x00\x00r!=8\x00\x00\x00\x00r!=h\x00\x02t\xc67`\xb5\xae'
因为psycopg2
不知道如何解码消息。你可以自己解码它们,使用 struct module from Python's standard library and the published message formats, or you could consider using wal2json instead of pgoutput
. wal2json
does not currently support publications, but it does provide the means to emulate 它们。