雪花使用流跟踪 Updates/Deletes 到 Table

Snowflake Using Streams to Track Updates/Deletes to a Table

我无法理解 Streams 在跟踪更改方面的工作原理。我想创建一个历史记录 table 来跟踪每个 UPDATEDELETE 到 table,但我发现我不明白它是如何工作的。

如果我有 table Table1 和流:

 CREATE TABLE Table1
 (
   XID INT IDENTITY PRIMARY KEY,
   FIELD1 INT,
   FIELD2 STRING,
   DATECREATED TIMESTAMP DEFAULT CURRENT_TIMESTAMP::TIMESTAMP
 );

 CREATE STREAM Table1_History ON TABLE Table1;

如果我插入数据:

INSERT INTO Table1 (FIELD1,FIELD2)
VALUES
(101,'String1'),
(102,'String2')
;

然后运行:

SELECT * FROM Table1_History;

它returns以下内容:

XID FIELD1  FIELD2  DATECREATED METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID
1   101 String1 2020-08-13 06:52:34.402 INSERT  FALSE   23bc7a4d83522484f4d7e36edf84b4c7986dfa9b
2   102 String2 2020-08-13 06:52:34.402 INSERT  FALSE   5b5e429cf3a174303b2f2192b5d602ed9dedd865

到目前为止一切顺利。

但是如果我运行:

UPDATE Table1 SET FIELD1 = 1001 WHERE XID = 1;

然后 select 来自 Table1_History,我得到:

SELECT * FROM Table1_History;

XID FIELD1  FIELD2  DATECREATED METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID
1   1001    String1 2020-08-13 06:52:34.402 INSERT  FALSE   23bc7a4d83522484f4d7e36edf84b4c7986dfa9b
2   102 String2 2020-08-13 06:52:34.402 INSERT  FALSE   5b5e429cf3a174303b2f2192b5d602ed9dedd865

METADATA$ACTION 仍然是 INSERTFIELD1 值现在作为 1001 存储在流中。不再有任何记录,我可以看到该行曾经具有 101 的值并且已更新。

如果我运行以下:

DELETE FROM Table1 WHERE XID = 2;

现在直播returns:

SELECT * FROM Table1_History;

XID FIELD1  FIELD2  DATECREATED METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID
1   1001    String1 2020-08-13 06:52:34.402 INSERT  FALSE   23bc7a4d83522484f4d7e36edf84b4c7986dfa9b

现在我可以在数据库中第二行的流中看到 0 条记录。

我不明白 Stream table 用于跟踪 UPDATES/DELETES 的要点。这不是使用流吗?

我试着按照这个:Snowflake Streams Made Simple,但我还是不明白。

引用 Snowflake 文档: “流存储 table 的 当前 事务版本,并且在大多数情况下是 CDC 记录的适当来源。”

查看 Snowflake 文档中的这个示例:https://docs.snowflake.com/en/user-guide/streams.html#example-1

我的理解是,流只会保存记录的当前版本,直到您推进偏移量。因此,如果您插入一条记录然后更新它,在增加偏移量之前,它将显示单个插入,但字段将保留最新值。

如果您随后推进偏移量并更新或删除记录,那么这些事件将显示在流中 - 但如果您更新然后删除相同的记录(在推进偏移量之前),流将只显示删除,因为这是该记录的最后位置。

更新 1 听起来您正在尝试对 table 中的记录所做的每个更改实施审计跟踪 - 这不是 Streams 的设计目的,我认为您无法实施解决方案,使用保证记录每个更改的流。

如果您阅读了 Streams 文档,它指出“流可以提供从当前偏移到源的当前事务时间的一组更改 table(即 [=36= 的当前版本]). 流仅维护更改的增量;如果多个 DML 语句更改一行,则流仅包含对该行采取的最新操作。"

CDC 是一个专门与加载数据仓库相关的术语,绝不是捕获对记录所做的每个更改的通用术语。

如果您想在 Snowflake 中创建真正的审计功能,恐怕我不知道这是否可行。时间旅行功能显示 Snowflake 保留了对记录所做的所有更改(在保留期内),但我不知道有任何方法可以访问这些更改;我认为您只能访问某个时间点的记录历史,而您无法知道在什么时候进行了任何更改

更新 2 刚刚意识到 Snowflake 允许在 table 上进行更改跟踪,而不必使用 Streams。如果您想捕获 table 的所有更改,而不仅仅是最新版本,这可能是一个更好的解决方案。该功能记录在此处: https://docs.snowflake.com/en/sql-reference/constructs/changes.html

好吧,正如@NickW 所说,流 table 更多是关于跟踪偏移量之间的变化。这意味着我仍然可以做我想做的事,但它需要在 DML 操作之间显式 INSERT 到历史记录 table。

首先创建主 Table、流和历史 table:

CREATE TABLE Table1
(
   XID INT IDENTITY PRIMARY KEY,
   FIELD1 INT,
   FIELD2 STRING,
   DATECREATED TIMESTAMP DEFAULT CURRENT_TIMESTAMP::TIMESTAMP
);

CREATE STREAM Table1_Stream ON TABLE Table1;

CREATE TABLE Table1_History
(
   UID INT IDENTITY PRIMARY KEY,
   XID INT,
   FIELD1 INT,
   FIELD2 STRING,
   DATECREATED TIMESTAMP,
   METADATA$ACTION STRING,   --METADATA Column from Stream
   METADATA$ISUPDATE STRING, --METADATA Column from Stream
   DATEINSERTED TIMESTAMP DEFAULT CURRENT_TIMESTAMP::TIMESTAMP
);

然后INSERT记录:

INSERT INTO Table1 (FIELD1,FIELD2)
VALUES
(101,'String1'),
(102,'String2')
;

流 Table 现在显示为:

SELECT * FROM Table1_Stream;

XID FIELD1  FIELD2  DATECREATED METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID
1   101    String1 2020-08-13 06:52:34.402 INSERT  FALSE   23bc7a4d83522484f4d7e36edf84b4c7986dfa9b
2   102 String2 2020-08-13 06:52:34.402 INSERT  FALSE   5b5e429cf3a174303b2f2192b5d602ed9dedd865

然后从流 table 到历史 table:INSERT:

INSERT INTO Table1_History(XID,FIELD1,FIELD2,DATECREATED,
                           METADATA$ACTION,METADATA$ISUPDATE)
SELECT XID,FIELD1,FIELD2,DATECREATED,
       METADATA$ACTION,UPPER(METADATA$ISUPDATE)
FROM Table1_Stream
WHERE METADATA$ACTION <> 'INSERT'
OR METADATA$ISUPDATE = 'TRUE'

请注意 WHERE 子句,它只删除 INSERT 记录,除非它是 UPDATE 的一部分,DELETEs 然后 INSERTs 一条记录。

现在,即使由于 WHERE 子句,流记录实际上并未插入历史记录 table,如果您查询流,您将得到 NULL:

SELECT * FROM Table1_Stream;

XID FIELD1  FIELD2  DATECREATED METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID

现在,如果您执行 UPDATE,流将显示它:

UPDATE Table1 SET FIELD1 = 1001 WHERE XID = 1;

SELECT * FROM Table1_Stream;

XID FIELD1  FIELD2  DATECREATED METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID
1   1001    String1 2020-08-14 09:11:20.173 INSERT  TRUE    93256f240f338581cc4781c2e79a28075e1b66d7
1   101 String1 2020-08-14 09:11:20.173 DELETE  TRUE    93256f240f338581cc4781c2e79a28075e1b66d7

现在运行插入:

INSERT INTO Table1_History(XID,FIELD1,FIELD2,DATECREATED,
                           METADATA$ACTION,METADATA$ISUPDATE)
SELECT XID,FIELD1,FIELD2,DATECREATED,
       METADATA$ACTION,UPPER(METADATA$ISUPDATE)
FROM Table1_Stream
WHERE METADATA$ACTION <> 'INSERT'
OR METADATA$ISUPDATE = 'TRUE'

SELECT * FROM Table1_History;

UID XID FIELD1  FIELD2  DATECREATED METADATA$ACTION METADATA$ISUPDATE   DATEINSERTED
1   1   1001    String1 2020-08-14 09:11:20.173 INSERT  TRUE    2020-08-14 09:13:41.474
2   1   101 String1 2020-08-14 09:11:20.173 DELETE  TRUE    2020-08-14 09:13:41.474

并且 Stream 再次为 NULL:

SELECT * FROM Table1_Stream;

XID FIELD1  FIELD2  DATECREATED METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID

如果您 运行 一个 DELETE,更改将再次反映在 Stream 中:

DELETE FROM Table1 WHERE XID = 2

SELECT * FROM Table1_Stream;

XID FIELD1  FIELD2  DATECREATED METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID
2   102 String2 2020-08-14 09:11:20.173 DELETE  FALSE   51d1d0f5c5bf9c328d79cbbd54a10bf99f73bcd3

并且可以INSERT进入历史table:

INSERT INTO Table1_History(XID,FIELD1,FIELD2,DATECREATED,
                           METADATA$ACTION,METADATA$ISUPDATE)
SELECT XID,FIELD1,FIELD2,DATECREATED,
       METADATA$ACTION,UPPER(METADATA$ISUPDATE)
FROM Table1_Stream
WHERE METADATA$ACTION <> 'INSERT'
OR METADATA$ISUPDATE = 'TRUE'

SELECT * FROM Table1_History;

UID XID FIELD1  FIELD2  DATECREATED METADATA$ACTION METADATA$ISUPDATE   DATEINSERTED
1   1   1001    String1 2020-08-14 09:11:20.173 INSERT  TRUE    2020-08-14 09:13:41.474
2   1   101 String1 2020-08-14 09:11:20.173 DELETE  TRUE    2020-08-14 09:13:41.474
4   2   102 String2 2020-08-14 09:11:20.173 DELETE  FALSE   2020-08-14 09:17:37.694

并且 Stream 再次为 NULL:

SELECT * FROM Table1_Stream;

XID FIELD1  FIELD2  DATECREATED METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID

我不明白的一件事是为什么 Table1_History 上的 UID 增加到 4 而不是 3,但这是一个微不足道的问题。

这就是我跟踪 Snowflake 中所有历史变化的方式。

流不是 table。

流仅测量自您上次使用(或最初定义)流以来的增量。您目前看到的是,在通过流拍摄另一个快照之前,您只是不断地覆盖数据。初始插入值是自上次捕获数据以来发生的变化。通过将插入值注销到另一个 table,将创建一个新快照,从中衡量变化。

我建议您创建一个 table: 创建 table Table_history (...)

对 table1 进行初始插入并查看流: (select * 来自 Table1_History);

现在从您定义的 STREAM 插入您的历史记录 table:

插入 table_history (...) select * 来自 Table1_History ;

再次检查流:select * 来自 Table1_History 数据应该没了。没有。

然后进行更新: 更新 Table1 SET FIELD1 = 1001 WHERE XID = 1;

再次检查流: select * 来自 Table1_History