雪花使用流跟踪 Updates/Deletes 到 Table
Snowflake Using Streams to Track Updates/Deletes to a Table
我无法理解 Streams 在跟踪更改方面的工作原理。我想创建一个历史记录 table 来跟踪每个 UPDATE
和 DELETE
到 table,但我发现我不明白它是如何工作的。
如果我有 table Table1
和流:
CREATE TABLE Table1
(
XID INT IDENTITY PRIMARY KEY,
FIELD1 INT,
FIELD2 STRING,
DATECREATED TIMESTAMP DEFAULT CURRENT_TIMESTAMP::TIMESTAMP
);
CREATE STREAM Table1_History ON TABLE Table1;
如果我插入数据:
INSERT INTO Table1 (FIELD1,FIELD2)
VALUES
(101,'String1'),
(102,'String2')
;
然后运行:
SELECT * FROM Table1_History;
它returns以下内容:
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
1 101 String1 2020-08-13 06:52:34.402 INSERT FALSE 23bc7a4d83522484f4d7e36edf84b4c7986dfa9b
2 102 String2 2020-08-13 06:52:34.402 INSERT FALSE 5b5e429cf3a174303b2f2192b5d602ed9dedd865
到目前为止一切顺利。
但是如果我运行:
UPDATE Table1 SET FIELD1 = 1001 WHERE XID = 1;
然后 select 来自 Table1_History
,我得到:
SELECT * FROM Table1_History;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
1 1001 String1 2020-08-13 06:52:34.402 INSERT FALSE 23bc7a4d83522484f4d7e36edf84b4c7986dfa9b
2 102 String2 2020-08-13 06:52:34.402 INSERT FALSE 5b5e429cf3a174303b2f2192b5d602ed9dedd865
METADATA$ACTION
仍然是 INSERT
,FIELD1
值现在作为 1001
存储在流中。不再有任何记录,我可以看到该行曾经具有 101
的值并且已更新。
如果我运行以下:
DELETE FROM Table1 WHERE XID = 2;
现在直播returns:
SELECT * FROM Table1_History;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
1 1001 String1 2020-08-13 06:52:34.402 INSERT FALSE 23bc7a4d83522484f4d7e36edf84b4c7986dfa9b
现在我可以在数据库中第二行的流中看到 0 条记录。
我不明白 Stream table 用于跟踪 UPDATES/DELETES
的要点。这不是使用流吗?
我试着按照这个:Snowflake Streams Made Simple,但我还是不明白。
引用 Snowflake 文档:
“流存储 table 的 当前 事务版本,并且在大多数情况下是 CDC 记录的适当来源。”
查看 Snowflake 文档中的这个示例:https://docs.snowflake.com/en/user-guide/streams.html#example-1
我的理解是,流只会保存记录的当前版本,直到您推进偏移量。因此,如果您插入一条记录然后更新它,在增加偏移量之前,它将显示单个插入,但字段将保留最新值。
如果您随后推进偏移量并更新或删除记录,那么这些事件将显示在流中 - 但如果您更新然后删除相同的记录(在推进偏移量之前),流将只显示删除,因为这是该记录的最后位置。
更新 1
听起来您正在尝试对 table 中的记录所做的每个更改实施审计跟踪 - 这不是 Streams 的设计目的,我认为您无法实施解决方案,使用保证记录每个更改的流。
如果您阅读了 Streams 文档,它指出“流可以提供从当前偏移到源的当前事务时间的一组更改 table(即 [=36= 的当前版本]). 流仅维护更改的增量;如果多个 DML 语句更改一行,则流仅包含对该行采取的最新操作。"
CDC 是一个专门与加载数据仓库相关的术语,绝不是捕获对记录所做的每个更改的通用术语。
如果您想在 Snowflake 中创建真正的审计功能,恐怕我不知道这是否可行。时间旅行功能显示 Snowflake 保留了对记录所做的所有更改(在保留期内),但我不知道有任何方法可以访问这些更改;我认为您只能访问某个时间点的记录历史,而您无法知道在什么时候进行了任何更改
更新 2
刚刚意识到 Snowflake 允许在 table 上进行更改跟踪,而不必使用 Streams。如果您想捕获 table 的所有更改,而不仅仅是最新版本,这可能是一个更好的解决方案。该功能记录在此处:
https://docs.snowflake.com/en/sql-reference/constructs/changes.html
好吧,正如@NickW 所说,流 table 更多是关于跟踪偏移量之间的变化。这意味着我仍然可以做我想做的事,但它需要在 DML
操作之间显式 INSERT
到历史记录 table。
首先创建主 Table、流和历史 table:
CREATE TABLE Table1
(
XID INT IDENTITY PRIMARY KEY,
FIELD1 INT,
FIELD2 STRING,
DATECREATED TIMESTAMP DEFAULT CURRENT_TIMESTAMP::TIMESTAMP
);
CREATE STREAM Table1_Stream ON TABLE Table1;
CREATE TABLE Table1_History
(
UID INT IDENTITY PRIMARY KEY,
XID INT,
FIELD1 INT,
FIELD2 STRING,
DATECREATED TIMESTAMP,
METADATA$ACTION STRING, --METADATA Column from Stream
METADATA$ISUPDATE STRING, --METADATA Column from Stream
DATEINSERTED TIMESTAMP DEFAULT CURRENT_TIMESTAMP::TIMESTAMP
);
然后INSERT
记录:
INSERT INTO Table1 (FIELD1,FIELD2)
VALUES
(101,'String1'),
(102,'String2')
;
流 Table 现在显示为:
SELECT * FROM Table1_Stream;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
1 101 String1 2020-08-13 06:52:34.402 INSERT FALSE 23bc7a4d83522484f4d7e36edf84b4c7986dfa9b
2 102 String2 2020-08-13 06:52:34.402 INSERT FALSE 5b5e429cf3a174303b2f2192b5d602ed9dedd865
然后从流 table 到历史 table:INSERT
:
INSERT INTO Table1_History(XID,FIELD1,FIELD2,DATECREATED,
METADATA$ACTION,METADATA$ISUPDATE)
SELECT XID,FIELD1,FIELD2,DATECREATED,
METADATA$ACTION,UPPER(METADATA$ISUPDATE)
FROM Table1_Stream
WHERE METADATA$ACTION <> 'INSERT'
OR METADATA$ISUPDATE = 'TRUE'
请注意 WHERE
子句,它只删除 INSERT
记录,除非它是 UPDATE
的一部分,DELETEs
然后 INSERTs
一条记录。
现在,即使由于 WHERE
子句,流记录实际上并未插入历史记录 table,如果您查询流,您将得到 NULL:
SELECT * FROM Table1_Stream;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
现在,如果您执行 UPDATE
,流将显示它:
UPDATE Table1 SET FIELD1 = 1001 WHERE XID = 1;
SELECT * FROM Table1_Stream;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
1 1001 String1 2020-08-14 09:11:20.173 INSERT TRUE 93256f240f338581cc4781c2e79a28075e1b66d7
1 101 String1 2020-08-14 09:11:20.173 DELETE TRUE 93256f240f338581cc4781c2e79a28075e1b66d7
现在运行插入:
INSERT INTO Table1_History(XID,FIELD1,FIELD2,DATECREATED,
METADATA$ACTION,METADATA$ISUPDATE)
SELECT XID,FIELD1,FIELD2,DATECREATED,
METADATA$ACTION,UPPER(METADATA$ISUPDATE)
FROM Table1_Stream
WHERE METADATA$ACTION <> 'INSERT'
OR METADATA$ISUPDATE = 'TRUE'
SELECT * FROM Table1_History;
UID XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE DATEINSERTED
1 1 1001 String1 2020-08-14 09:11:20.173 INSERT TRUE 2020-08-14 09:13:41.474
2 1 101 String1 2020-08-14 09:11:20.173 DELETE TRUE 2020-08-14 09:13:41.474
并且 Stream 再次为 NULL:
SELECT * FROM Table1_Stream;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
如果您 运行 一个 DELETE
,更改将再次反映在 Stream 中:
DELETE FROM Table1 WHERE XID = 2
SELECT * FROM Table1_Stream;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
2 102 String2 2020-08-14 09:11:20.173 DELETE FALSE 51d1d0f5c5bf9c328d79cbbd54a10bf99f73bcd3
并且可以INSERT
进入历史table:
INSERT INTO Table1_History(XID,FIELD1,FIELD2,DATECREATED,
METADATA$ACTION,METADATA$ISUPDATE)
SELECT XID,FIELD1,FIELD2,DATECREATED,
METADATA$ACTION,UPPER(METADATA$ISUPDATE)
FROM Table1_Stream
WHERE METADATA$ACTION <> 'INSERT'
OR METADATA$ISUPDATE = 'TRUE'
SELECT * FROM Table1_History;
UID XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE DATEINSERTED
1 1 1001 String1 2020-08-14 09:11:20.173 INSERT TRUE 2020-08-14 09:13:41.474
2 1 101 String1 2020-08-14 09:11:20.173 DELETE TRUE 2020-08-14 09:13:41.474
4 2 102 String2 2020-08-14 09:11:20.173 DELETE FALSE 2020-08-14 09:17:37.694
并且 Stream 再次为 NULL:
SELECT * FROM Table1_Stream;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
我不明白的一件事是为什么 Table1_History
上的 UID
增加到 4
而不是 3
,但这是一个微不足道的问题。
这就是我跟踪 Snowflake 中所有历史变化的方式。
流不是 table。
流仅测量自您上次使用(或最初定义)流以来的增量。您目前看到的是,在通过流拍摄另一个快照之前,您只是不断地覆盖数据。初始插入值是自上次捕获数据以来发生的变化。通过将插入值注销到另一个 table,将创建一个新快照,从中衡量变化。
我建议您创建一个 table:
创建 table Table_history (...)
对 table1 进行初始插入并查看流:
(select * 来自 Table1_History);
现在从您定义的 STREAM 插入您的历史记录 table:
插入 table_history (...)
select * 来自 Table1_History ;
再次检查流:select * 来自 Table1_History
数据应该没了。没有。
然后进行更新:
更新 Table1 SET FIELD1 = 1001 WHERE XID = 1;
再次检查流:
select * 来自 Table1_History
我无法理解 Streams 在跟踪更改方面的工作原理。我想创建一个历史记录 table 来跟踪每个 UPDATE
和 DELETE
到 table,但我发现我不明白它是如何工作的。
如果我有 table Table1
和流:
CREATE TABLE Table1
(
XID INT IDENTITY PRIMARY KEY,
FIELD1 INT,
FIELD2 STRING,
DATECREATED TIMESTAMP DEFAULT CURRENT_TIMESTAMP::TIMESTAMP
);
CREATE STREAM Table1_History ON TABLE Table1;
如果我插入数据:
INSERT INTO Table1 (FIELD1,FIELD2)
VALUES
(101,'String1'),
(102,'String2')
;
然后运行:
SELECT * FROM Table1_History;
它returns以下内容:
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
1 101 String1 2020-08-13 06:52:34.402 INSERT FALSE 23bc7a4d83522484f4d7e36edf84b4c7986dfa9b
2 102 String2 2020-08-13 06:52:34.402 INSERT FALSE 5b5e429cf3a174303b2f2192b5d602ed9dedd865
到目前为止一切顺利。
但是如果我运行:
UPDATE Table1 SET FIELD1 = 1001 WHERE XID = 1;
然后 select 来自 Table1_History
,我得到:
SELECT * FROM Table1_History;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
1 1001 String1 2020-08-13 06:52:34.402 INSERT FALSE 23bc7a4d83522484f4d7e36edf84b4c7986dfa9b
2 102 String2 2020-08-13 06:52:34.402 INSERT FALSE 5b5e429cf3a174303b2f2192b5d602ed9dedd865
METADATA$ACTION
仍然是 INSERT
,FIELD1
值现在作为 1001
存储在流中。不再有任何记录,我可以看到该行曾经具有 101
的值并且已更新。
如果我运行以下:
DELETE FROM Table1 WHERE XID = 2;
现在直播returns:
SELECT * FROM Table1_History;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
1 1001 String1 2020-08-13 06:52:34.402 INSERT FALSE 23bc7a4d83522484f4d7e36edf84b4c7986dfa9b
现在我可以在数据库中第二行的流中看到 0 条记录。
我不明白 Stream table 用于跟踪 UPDATES/DELETES
的要点。这不是使用流吗?
我试着按照这个:Snowflake Streams Made Simple,但我还是不明白。
引用 Snowflake 文档: “流存储 table 的 当前 事务版本,并且在大多数情况下是 CDC 记录的适当来源。”
查看 Snowflake 文档中的这个示例:https://docs.snowflake.com/en/user-guide/streams.html#example-1
我的理解是,流只会保存记录的当前版本,直到您推进偏移量。因此,如果您插入一条记录然后更新它,在增加偏移量之前,它将显示单个插入,但字段将保留最新值。
如果您随后推进偏移量并更新或删除记录,那么这些事件将显示在流中 - 但如果您更新然后删除相同的记录(在推进偏移量之前),流将只显示删除,因为这是该记录的最后位置。
更新 1 听起来您正在尝试对 table 中的记录所做的每个更改实施审计跟踪 - 这不是 Streams 的设计目的,我认为您无法实施解决方案,使用保证记录每个更改的流。
如果您阅读了 Streams 文档,它指出“流可以提供从当前偏移到源的当前事务时间的一组更改 table(即 [=36= 的当前版本]). 流仅维护更改的增量;如果多个 DML 语句更改一行,则流仅包含对该行采取的最新操作。"
CDC 是一个专门与加载数据仓库相关的术语,绝不是捕获对记录所做的每个更改的通用术语。
如果您想在 Snowflake 中创建真正的审计功能,恐怕我不知道这是否可行。时间旅行功能显示 Snowflake 保留了对记录所做的所有更改(在保留期内),但我不知道有任何方法可以访问这些更改;我认为您只能访问某个时间点的记录历史,而您无法知道在什么时候进行了任何更改
更新 2 刚刚意识到 Snowflake 允许在 table 上进行更改跟踪,而不必使用 Streams。如果您想捕获 table 的所有更改,而不仅仅是最新版本,这可能是一个更好的解决方案。该功能记录在此处: https://docs.snowflake.com/en/sql-reference/constructs/changes.html
好吧,正如@NickW 所说,流 table 更多是关于跟踪偏移量之间的变化。这意味着我仍然可以做我想做的事,但它需要在 DML
操作之间显式 INSERT
到历史记录 table。
首先创建主 Table、流和历史 table:
CREATE TABLE Table1
(
XID INT IDENTITY PRIMARY KEY,
FIELD1 INT,
FIELD2 STRING,
DATECREATED TIMESTAMP DEFAULT CURRENT_TIMESTAMP::TIMESTAMP
);
CREATE STREAM Table1_Stream ON TABLE Table1;
CREATE TABLE Table1_History
(
UID INT IDENTITY PRIMARY KEY,
XID INT,
FIELD1 INT,
FIELD2 STRING,
DATECREATED TIMESTAMP,
METADATA$ACTION STRING, --METADATA Column from Stream
METADATA$ISUPDATE STRING, --METADATA Column from Stream
DATEINSERTED TIMESTAMP DEFAULT CURRENT_TIMESTAMP::TIMESTAMP
);
然后INSERT
记录:
INSERT INTO Table1 (FIELD1,FIELD2)
VALUES
(101,'String1'),
(102,'String2')
;
流 Table 现在显示为:
SELECT * FROM Table1_Stream;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
1 101 String1 2020-08-13 06:52:34.402 INSERT FALSE 23bc7a4d83522484f4d7e36edf84b4c7986dfa9b
2 102 String2 2020-08-13 06:52:34.402 INSERT FALSE 5b5e429cf3a174303b2f2192b5d602ed9dedd865
然后从流 table 到历史 table:INSERT
:
INSERT INTO Table1_History(XID,FIELD1,FIELD2,DATECREATED,
METADATA$ACTION,METADATA$ISUPDATE)
SELECT XID,FIELD1,FIELD2,DATECREATED,
METADATA$ACTION,UPPER(METADATA$ISUPDATE)
FROM Table1_Stream
WHERE METADATA$ACTION <> 'INSERT'
OR METADATA$ISUPDATE = 'TRUE'
请注意 WHERE
子句,它只删除 INSERT
记录,除非它是 UPDATE
的一部分,DELETEs
然后 INSERTs
一条记录。
现在,即使由于 WHERE
子句,流记录实际上并未插入历史记录 table,如果您查询流,您将得到 NULL:
SELECT * FROM Table1_Stream;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
现在,如果您执行 UPDATE
,流将显示它:
UPDATE Table1 SET FIELD1 = 1001 WHERE XID = 1;
SELECT * FROM Table1_Stream;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
1 1001 String1 2020-08-14 09:11:20.173 INSERT TRUE 93256f240f338581cc4781c2e79a28075e1b66d7
1 101 String1 2020-08-14 09:11:20.173 DELETE TRUE 93256f240f338581cc4781c2e79a28075e1b66d7
现在运行插入:
INSERT INTO Table1_History(XID,FIELD1,FIELD2,DATECREATED,
METADATA$ACTION,METADATA$ISUPDATE)
SELECT XID,FIELD1,FIELD2,DATECREATED,
METADATA$ACTION,UPPER(METADATA$ISUPDATE)
FROM Table1_Stream
WHERE METADATA$ACTION <> 'INSERT'
OR METADATA$ISUPDATE = 'TRUE'
SELECT * FROM Table1_History;
UID XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE DATEINSERTED
1 1 1001 String1 2020-08-14 09:11:20.173 INSERT TRUE 2020-08-14 09:13:41.474
2 1 101 String1 2020-08-14 09:11:20.173 DELETE TRUE 2020-08-14 09:13:41.474
并且 Stream 再次为 NULL:
SELECT * FROM Table1_Stream;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
如果您 运行 一个 DELETE
,更改将再次反映在 Stream 中:
DELETE FROM Table1 WHERE XID = 2
SELECT * FROM Table1_Stream;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
2 102 String2 2020-08-14 09:11:20.173 DELETE FALSE 51d1d0f5c5bf9c328d79cbbd54a10bf99f73bcd3
并且可以INSERT
进入历史table:
INSERT INTO Table1_History(XID,FIELD1,FIELD2,DATECREATED,
METADATA$ACTION,METADATA$ISUPDATE)
SELECT XID,FIELD1,FIELD2,DATECREATED,
METADATA$ACTION,UPPER(METADATA$ISUPDATE)
FROM Table1_Stream
WHERE METADATA$ACTION <> 'INSERT'
OR METADATA$ISUPDATE = 'TRUE'
SELECT * FROM Table1_History;
UID XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE DATEINSERTED
1 1 1001 String1 2020-08-14 09:11:20.173 INSERT TRUE 2020-08-14 09:13:41.474
2 1 101 String1 2020-08-14 09:11:20.173 DELETE TRUE 2020-08-14 09:13:41.474
4 2 102 String2 2020-08-14 09:11:20.173 DELETE FALSE 2020-08-14 09:17:37.694
并且 Stream 再次为 NULL:
SELECT * FROM Table1_Stream;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
我不明白的一件事是为什么 Table1_History
上的 UID
增加到 4
而不是 3
,但这是一个微不足道的问题。
这就是我跟踪 Snowflake 中所有历史变化的方式。
流不是 table。
流仅测量自您上次使用(或最初定义)流以来的增量。您目前看到的是,在通过流拍摄另一个快照之前,您只是不断地覆盖数据。初始插入值是自上次捕获数据以来发生的变化。通过将插入值注销到另一个 table,将创建一个新快照,从中衡量变化。
我建议您创建一个 table: 创建 table Table_history (...)
对 table1 进行初始插入并查看流: (select * 来自 Table1_History);
现在从您定义的 STREAM 插入您的历史记录 table:
插入 table_history (...) select * 来自 Table1_History ;
再次检查流:select * 来自 Table1_History 数据应该没了。没有。
然后进行更新: 更新 Table1 SET FIELD1 = 1001 WHERE XID = 1;
再次检查流: select * 来自 Table1_History