使用 psycopg2 和发布者进行逻辑复制

Logical replication with psycopg2 and publishers

有写在python和psycopg2中的逻辑复制脚本: https://www.psycopg.org/docs/advanced.html#logical-replication-quick-start

有效。但我不想接收数据库中的所有操作,而只想接收发布者中指定的操作。

为此:

  1. 我在源数据库(版本 13.3)中创建了名为“PUB”和一个 table 的发布者。我还部署了第二个数据库(版本 14.1)并在其中创建了对此出版物的订阅,以检查该出版物的功能。并且有效。

  2. 在 psycopg2 文档 (https://www.psycopg.org/docs/extras.html#replication-support-objects) and postgresql documentation (https://postgrespro.ru/docs/postgresql/13/protocol-logical-replication?lang=en) 中,我找到了一种在 start_replication

    中设置 publication_names 的方法
cur.start_replication(slot_name='pytest', decode=True, options={'publication_names': 'PUB'})

但是没有用。我收到错误:

psycopg2.errors.InvalidParameterValue: option "publication_names" = "PUB," is unknown

原来是这个问题。 psycorpg2 中的逻辑复制可以与订阅相关联吗?如果是,怎么做?

P.s。有替代方案(Trigger/Notify -> Listen/psycopg2),但我想尽可能解决这个问题。

test_decoding 插件不支持 publication_names 选项。

内置PUBLICATION使用的插件叫'pgoutput',不是'test_decoding'。但据我所知,psycopg2 不支持使用该插件。

所以你想要的不被支持。我不知道是否有人正在努力增加对它的支持。

你的问题的答案可能是“是和否”。

作为jjanes states in their ,您必须使用pgoutput输出插件才能使用出版物。您可以配置 psycopg2 以像这样使用它:

conn = psycopg2.connect(
    dbname='test',
    connection_factory=psycopg2.extras.LogicalReplicationConnection,
)

options = {'publication_names': 'pub', 'proto_version': '1'}
cur = conn.cursor()
try:
    cur.start_replication(slot_name='pytest', decode=False, options=options)
except psycopg2.ProgrammingError:
    cur.create_replication_slot(
        'pytest',
        output_plugin='pgoutput',
    )
    cur.start_replication(slot_name='pytest', decode=False, options=options)

但是pgoutput使用二进制协议,所以这些语句的输出

insert into table1 (col) values ('hello hello');
update table1 set col = 'hello goodbye' where id = 6;

b'B\x00\x00\x00\x00r!<x\x00\x02t\xc65\xbb\xb5\xd3\x00\x00jd'
b'R\x00\x00F:public\x00table1\x00d\x00\x02\x01id\x00\x00\x00\x00\x17\xff\xff\xff\xff\x00col\x00\x00\x00\x04\x13\x00\x00\x00\x14'
b'I\x00\x00F:N\x00\x02t\x00\x00\x00\x016t\x00\x00\x00\x0bhello hello'
b'C\x00\x00\x00\x00\x00r!<x\x00\x00\x00\x00r!<\xa8\x00\x02t\xc65\xbb\xb5\xd3'
b'B\x00\x00\x00\x00r!=8\x00\x02t\xc67`\xb5\xae\x00\x00je'
b'U\x00\x00F:N\x00\x02t\x00\x00\x00\x016t\x00\x00\x00\rhello goodbye'
b'C\x00\x00\x00\x00\x00r!=8\x00\x00\x00\x00r!=h\x00\x02t\xc67`\xb5\xae'

因为psycopg2不知道如何解码消息。你可以自己解码它们,使用 struct module from Python's standard library and the published message formats, or you could consider using wal2json instead of pgoutput. wal2json does not currently support publications, but it does provide the means to emulate 它们。