postgresql:逻辑复制是否包括回滚事务?
postgresql: does logical replication include rollback transaction?
逻辑复制是否以事务为单位解析WAL文件?回滚事务呢?
而且,什么是 API 可以在接收端进入数据更改,而不在 SQL 级别重放它们?就像postgresql内置的流复制receiver一样,无论是逻辑还是物理。
编辑:
让我再澄清一下我的问题。
流媒体逻辑流程如下图:
postgresql实例(发送方,创建带有特定输出插件的插槽)--------流式协议------------> postgresql实例(接收方,获取复制数据)
这里复制数据的格式由输出插件决定,假设是纯文本。那么直接的,我们可以把它当成SQL语句,在receiver postgresql中replay,但是显然效率很低。是否有任何低级API进入副本数据?
我花了一些时间研究源代码,并尝试自己回答这个问题。
- postgresql walsender 只包含提交的事务,忽略中止的事务。
相关代码路径和片段:
pg_logical_slot_get_changes_guts() -> LogicalDecodingProcessRecord() -> DecodeXactOp() ->
ReorderBufferCommit() -> ReorderBufferIterTXNNext()
DecodeXactOp():
switch (info)
{
case XLOG_XACT_COMMIT:
case XLOG_XACT_COMMIT_PREPARED:
{
xl_xact_commit *xlrec;
xl_xact_parsed_commit parsed;
TransactionId xid;
xlrec = (xl_xact_commit *) XLogRecGetData(r);
ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
if (!TransactionIdIsValid(parsed.twophase_xid))
xid = XLogRecGetXid(r);
else
xid = parsed.twophase_xid;
DecodeCommit(ctx, buf, &parsed, xid);
break;
}
- 内置的逻辑应用工作者不重播 SQL,但使用许多内部 API 在接收方应用更改。
https://github.com/postgres/postgres/blob/master/src/backend/replication/logical/worker.c#L585
/* Input functions may need an active snapshot, so get one */
PushActiveSnapshot(GetTransactionSnapshot());
/* Process and store remote tuple in the slot */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
slot_store_cstrings(remoteslot, rel, newtup.values);
slot_fill_defaults(rel, estate, remoteslot);
MemoryContextSwitchTo(oldctx);
ExecOpenIndices(estate->es_result_relation_info, false);
/* Do the insert. */
ExecSimpleRelationInsert(estate, remoteslot);
/* Cleanup. */
ExecCloseIndices(estate->es_result_relation_info);
PopActiveSnapshot();
PG10的逻辑复制使用pgoutput
作为输出插件,是二进制格式,适合直接ingress数据
https://github.com/postgres/postgres/blob/master/src/backend/replication/pgoutput/pgoutput.c
逻辑复制是否以事务为单位解析WAL文件?回滚事务呢?
而且,什么是 API 可以在接收端进入数据更改,而不在 SQL 级别重放它们?就像postgresql内置的流复制receiver一样,无论是逻辑还是物理。
编辑:
让我再澄清一下我的问题。
流媒体逻辑流程如下图:
postgresql实例(发送方,创建带有特定输出插件的插槽)--------流式协议------------> postgresql实例(接收方,获取复制数据)
这里复制数据的格式由输出插件决定,假设是纯文本。那么直接的,我们可以把它当成SQL语句,在receiver postgresql中replay,但是显然效率很低。是否有任何低级API进入副本数据?
我花了一些时间研究源代码,并尝试自己回答这个问题。
- postgresql walsender 只包含提交的事务,忽略中止的事务。
相关代码路径和片段:
pg_logical_slot_get_changes_guts() -> LogicalDecodingProcessRecord() -> DecodeXactOp() ->
ReorderBufferCommit() -> ReorderBufferIterTXNNext()
DecodeXactOp():
switch (info)
{
case XLOG_XACT_COMMIT:
case XLOG_XACT_COMMIT_PREPARED:
{
xl_xact_commit *xlrec;
xl_xact_parsed_commit parsed;
TransactionId xid;
xlrec = (xl_xact_commit *) XLogRecGetData(r);
ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
if (!TransactionIdIsValid(parsed.twophase_xid))
xid = XLogRecGetXid(r);
else
xid = parsed.twophase_xid;
DecodeCommit(ctx, buf, &parsed, xid);
break;
}
- 内置的逻辑应用工作者不重播 SQL,但使用许多内部 API 在接收方应用更改。
https://github.com/postgres/postgres/blob/master/src/backend/replication/logical/worker.c#L585
/* Input functions may need an active snapshot, so get one */
PushActiveSnapshot(GetTransactionSnapshot());
/* Process and store remote tuple in the slot */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
slot_store_cstrings(remoteslot, rel, newtup.values);
slot_fill_defaults(rel, estate, remoteslot);
MemoryContextSwitchTo(oldctx);
ExecOpenIndices(estate->es_result_relation_info, false);
/* Do the insert. */
ExecSimpleRelationInsert(estate, remoteslot);
/* Cleanup. */
ExecCloseIndices(estate->es_result_relation_info);
PopActiveSnapshot();
PG10的逻辑复制使用pgoutput
作为输出插件,是二进制格式,适合直接ingress数据
https://github.com/postgres/postgres/blob/master/src/backend/replication/pgoutput/pgoutput.c