postgresql:逻辑复制是否包括回滚事务?

postgresql: does logical replication include rollback transaction?

逻辑复制是否以事务为单位解析WAL文件?回滚事务呢?

而且,什么是 API 可以在接收端进入数据更改,而不在 SQL 级别重放它们?就像postgresql内置的流复制receiver一样,无论是逻辑还是物理。

编辑:

让我再澄清一下我的问题。

流媒体逻辑流程如下图:

postgresql实例(发送方,创建带有特定输出插件的插槽)--------流式协议------------> postgresql实例(接收方,获取复制数据)

这里复制数据的格式由输出插件决定,假设是纯文本。那么直接的,我们可以把它当成SQL语句,在receiver postgresql中replay,但是显然效率很低。是否有任何低级API进入副本数据?

我花了一些时间研究源代码,并尝试自己回答这个问题。

  • postgresql walsender 只包含提交的事务,忽略中止的事务。

相关代码路径和片段:

pg_logical_slot_get_changes_guts() -> LogicalDecodingProcessRecord() -> DecodeXactOp() -> 
ReorderBufferCommit() -> ReorderBufferIterTXNNext()

DecodeXactOp():

switch (info)
{
    case XLOG_XACT_COMMIT:
    case XLOG_XACT_COMMIT_PREPARED:
        {
            xl_xact_commit *xlrec;
            xl_xact_parsed_commit parsed;
            TransactionId xid;

            xlrec = (xl_xact_commit *) XLogRecGetData(r);
            ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);

            if (!TransactionIdIsValid(parsed.twophase_xid))
                xid = XLogRecGetXid(r);
            else
                xid = parsed.twophase_xid;

            DecodeCommit(ctx, buf, &parsed, xid);
            break;
        }
  • 内置的逻辑应用工作者不重播 SQL,但使用许多内部 API 在接收方应用更改。

https://github.com/postgres/postgres/blob/master/src/backend/replication/logical/worker.c#L585

/* Input functions may need an active snapshot, so get one */
PushActiveSnapshot(GetTransactionSnapshot());

/* Process and store remote tuple in the slot */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
slot_store_cstrings(remoteslot, rel, newtup.values);
slot_fill_defaults(rel, estate, remoteslot);
MemoryContextSwitchTo(oldctx);

ExecOpenIndices(estate->es_result_relation_info, false);

/* Do the insert. */
ExecSimpleRelationInsert(estate, remoteslot);

/* Cleanup. */
ExecCloseIndices(estate->es_result_relation_info);
PopActiveSnapshot();

PG10的逻辑复制使用pgoutput作为输出插件,是二进制格式,适合直接ingress数据

https://github.com/postgres/postgres/blob/master/src/backend/replication/pgoutput/pgoutput.c