如何使用 Snowflake 任务摄取大型流 table?
How to ingest a large Stream table using Snowflake Task?
我正在使用 Snowflake Kafka Sink Connector 将数据从 Debezium 提取到 Snowflake table。我已经在此 table 上创建了一个 Stream 和一个 Task。当来自 Kafka 的数据到达源 table 时,流被填充并且任务运行 MERGE
命令将数据写入最终 table.
但是,由于流已增长到大约 5000 万行,任务无法 运行 完成并超时。
为了解决这个问题,我尝试了以下方法:
- 将任务的超时时间从 1 小时增加到 24 小时。
- 将仓库大小增加到中等。
任务在 24 小时后仍未完成并超时。
难道摄取5000万行需要更大的仓库来摄取这些行?如何让任务 运行 完成?
MERGE
声明
MERGE INTO TARGET.MESSAGE AS P
USING (SELECT RECORD_CONTENT:payload:before.id::VARCHAR AS BEFORE_ID,
RECORD_CONTENT:payload:before.agency_id::VARCHAR AS BEFORE_AGENCY_ID,
RECORD_CONTENT:payload:after.id::VARCHAR AS AFTER_ID,
RECORD_CONTENT:payload:after.agency_id::VARCHAR AS AFTER_AGENCY_ID,
RECORD_CONTENT:payload:after::VARIANT AS PAYLOAD,
RECORD_CONTENT:payload:source.ts_ms::INT AS TS_MS,
RECORD_CONTENT:payload:op::VARCHAR AS OP
FROM RAW.MESSAGE_STREAM
QUALIFY ROW_NUMBER() OVER (
PARTITION BY COALESCE(AFTER_ID, BEFORE_ID), COALESCE(AFTER_AGENCY_ID, BEFORE_AGENCY_ID)
ORDER BY TS_MS DESC
) = 1) PS ON (P.ID = PS.AFTER_ID AND P.AGENCY_ID = PS.AFTER_AGENCY_ID) OR
(P.ID = PS.BEFORE_ID AND P.AGENCY_ID = PS.BEFORE_AGENCY_ID)
WHEN MATCHED AND PS.OP = 'd' THEN DELETE
WHEN MATCHED AND PS.OP IN ('u', 'r') THEN UPDATE SET P.PAYLOAD = PS.PAYLOAD, P.TS_MS = PS.TS_MS
WHEN NOT MATCHED AND PS.OP IN ('c', 'r', 'u') THEN INSERT (P.ID, P.AGENCY_ID, P.PAYLOAD, P.TS_MS) VALUES (PS.AFTER_ID, PS.AFTER_AGENCY_ID, PS.PAYLOAD, PS.TS_MS);
EXPLAIN
计划
GlobalStats:
partitionsTotal=742
partitionsAssigned=742
bytesAssigned=3596441600
Operations:
1:0 ->Result number of rows inserted, number of rows updated, number of rows deleted
1:1 ->WindowFunction ROW_NUMBER() OVER (PARTITION BY IFNULL(TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'id')), TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'id'))), IFNULL(TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'agency_id')), TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'agency_id'))) ORDER BY TO_NUMBER(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'source'), 'ts_ms')) DESC NULLS FIRST)
1:2 ->LeftOuterJoin joinFilter: ((P.ID = (TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'id')))) AND (P.AGENCY_ID = (TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'agency_id'))))) OR ((P.ID = (TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'id')))) AND (P.AGENCY_ID = (TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'agency_id')))))
1:3 ->Filter ROW_NUMBER() OVER (PARTITION BY IFNULL(TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'id')), TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'id'))), IFNULL(TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'agency_id')), TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'agency_id'))) ORDER BY TO_NUMBER(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'source'), 'ts_ms')) DESC NULLS FIRST) = 1
1:4 ->UnionAll
1:5 ->Filter CHANGES.A_METADATA$ACTION IS NOT NULL
1:6 ->WithReference
1:7 ->WithClause CHANGES
1:8 ->Filter (A.METADATA$SHORTNAME IS NULL) OR (D.METADATA$SHORTNAME IS NULL) OR (NOT(EQUAL_NULL(SCAN_FDN_FILES.RECORD_METADATA, SCAN_FDN_FILES.RECORD_METADATA))) OR (NOT(EQUAL_NULL(SCAN_FDN_FILES.RECORD_CONTENT, SCAN_FDN_FILES.RECORD_CONTENT)))
1:9 ->FullOuterJoin joinKey: (D.METADATA$ROW_ID = A.METADATA$ROW_ID) AND (D.METADATA$SHORTNAME = A.METADATA$SHORTNAME)
1:10 ->TableScan DATABASE.RAW.MESSAGE as SCAN_FDN_FILES METADATA$PARTITION_ROW_NUMBER, METADATA$PARTITION_NAME, RECORD_METADATA, RECORD_CONTENT, METADATA$ORIGINAL_PARTITION_NAME, METADATA$ORIGINAL_PARTITION_ROW_NUMBER {partitionsTotal=17, partitionsAssigned=17, bytesAssigned=20623360}
1:11 ->TableScan DATABASE.RAW.MESSAGE as SCAN_FDN_FILES METADATA$PARTITION_ROW_NUMBER, METADATA$PARTITION_NAME, RECORD_METADATA, RECORD_CONTENT, METADATA$ORIGINAL_PARTITION_NAME, METADATA$ORIGINAL_PARTITION_ROW_NUMBER {partitionsTotal=507, partitionsAssigned=507, bytesAssigned=3519694336}
1:12 ->Filter CHANGES.D_METADATA$ACTION IS NOT NULL
1:13 ->WithReference
1:14 ->TableScan DATABASE.TARGET.MESSAGE as P ID, AGENCY_ID {partitionsTotal=218, partitionsAssigned=218, bytesAssigned=56123904}
查询配置文件
Query Profile
我有 re-jiggled 你 SQL 只是为了让我更易读..
MERGE INTO target.message AS p
USING (
SELECT
record_content:payload:before.id::VARCHAR AS before_id,
record_content:payload:before.agency_id::VARCHAR AS before_agency_id,
record_content:payload:after.id::VARCHAR AS after_id,
record_content:payload:after.agency_id::VARCHAR AS after_agency_id,
record_content:payload:after::VARIANT AS payload,
record_content:payload:source.ts_ms::INT AS ts_ms,
record_content:payload:op::VARCHAR AS op,
COALESCE(after_id, before_id) AS id_a
COALESCE(after_agency_id, before_agency_id) AS id_b
FROM raw.message_stream
QUALIFY ROW_NUMBER() OVER (PARTITION BY id_a, id_b ORDER BY ts_ms DESC ) = 1
) AS ps
ON (p.id = ps.after_id AND p.agency_id = ps.after_agency_id) OR
(p.id = ps.before_id AND p.agency_id = ps.before_agency_id)
WHEN MATCHED AND ps.op = 'd'
THEN DELETE
WHEN MATCHED AND ps.op IN ('u', 'r')
THEN UPDATE SET p.payload = ps.payload, p.ts_ms = ps.ts_ms
WHEN NOT MATCHED AND ps.op IN ('c', 'r', 'u')
THEN INSERT (p.id, p.agency_id, p.payload, p.ts_ms)
VALUES (ps.after_id, ps.after_agency_id, ps.payload, ps.ts_ms);
我在这里没有看到任何超级可怕的东西。我将 QUALIFY 中使用的两个值的 COALESCE 推入 SELECT 中,这样我就可以更简单地阅读它。
但是看看 ON 逻辑,如果之后的不匹配,你准备匹配之前的,但是将它与 COALESCE 逻辑混合是不是两个 after 值同时为空?也就是说,如果 after_id
为空,after_agency_id
也将为空。因为如果“之后”不为空但不匹配,那么如果您也不想关心检查“之前”。那么你可以使用:
ON p.id = ps.id_a AND p.agency_id = ps.id_b
尽管如此,您可能想要更好地命名它们。这应该会改善它。
回到 JOIN 逻辑,我认为上述可能适用的另一个原因是你是 grouping/partitioning 如果存在后值 ROW_NUMBER,这意味着你是否具有相同的值值和之前的值不同,由于当前 ROW_NUMBER 后者可能会被丢弃。
但除此之外,它看起来并没有做任何“真正糟糕”的事情,此时您可能想要 运行 一个 4-8 倍大的仓库,让它 24/8 小时看看如果它在 10% 的额外时间内完成。大仓库的成本应该在小实时钟时间里抵消。
愚蠢的想法:
在您提到的较小数据集上,尝试 SQL 变得非常简单:
MERGE INTO target.message AS p
USING (
(
SELECT
b.before_id,
b.before_agency_id,
b.after_id,
b.after_agency_id,
b.payload,
b.ts_ms,
b.op,
FROM (
SELECT
A.*
,COALESCE(a.after_id, a.before_id) AS id_a
,COALESCE(a.after_agency_id, a.before_agency_id) AS id_b
,ROW_NUMBER() OVER (PARTITION BY id_a, id_b ORDER BY ts_ms DESC ) as rn
FROM (
SELECT
record_content:payload:before.id::VARCHAR AS before_id,
record_content:payload:before.agency_id::VARCHAR AS before_agency_id,
record_content:payload:after.id::VARCHAR AS after_id,
record_content:payload:after.agency_id::VARCHAR AS after_agency_id,
record_content:payload:after::VARIANT AS payload,
record_content:payload:source.ts_ms::INT AS ts_ms,
record_content:payload:op::VARCHAR AS op,
FROM raw.message_stream
` ) as A
) AS B
WHERE b.rn = 1
) AS ps
ON (p.id = ps.after_id AND p.agency_id = ps.after_agency_id) OR
(p.id = ps.before_id AND p.agency_id = ps.before_agency_id)
WHEN MATCHED AND ps.op = 'd'
THEN DELETE
WHEN MATCHED AND ps.op IN ('u', 'r')
THEN UPDATE SET p.payload = ps.payload, p.ts_ms = ps.ts_ms
WHEN NOT MATCHED AND ps.op IN ('c', 'r', 'u')
THEN INSERT (p.id, p.agency_id, p.payload, p.ts_ms)
VALUES (ps.after_id, ps.after_agency_id, ps.payload, ps.ts_ms);
我怀疑连接对你的数据有用..在克隆的 tables 上,只是为了看看性能影响如何:
MERGE INTO target.message AS p
USING (
(
SELECT
--b.before_id,
--b.before_agency_id,
b.after_id,
b.after_agency_id,
b.payload,
b.ts_ms,
b.op,
b.id_a,
b.id_b
FROM (
SELECT
A.*
,COALESCE(a.after_id, a.before_id) AS id_a
,COALESCE(a.after_agency_id, a.before_agency_id) AS id_b
,ROW_NUMBER() OVER (PARTITION BY id_a, id_b ORDER BY ts_ms DESC ) as rn
FROM (
SELECT
record_content:payload:before.id::VARCHAR AS before_id,
record_content:payload:before.agency_id::VARCHAR AS before_agency_id,
record_content:payload:after.id::VARCHAR AS after_id,
record_content:payload:after.agency_id::VARCHAR AS after_agency_id,
record_content:payload:after::VARIANT AS payload,
record_content:payload:source.ts_ms::INT AS ts_ms,
record_content:payload:op::VARCHAR AS op,
FROM raw.message_stream
` ) as A
) AS B
WHERE b.rn = 1
) AS ps
ON p.id = ps.id_a AND p.agency_id = ps.id_b
WHEN MATCHED AND ps.op = 'd'
THEN DELETE
WHEN MATCHED AND ps.op IN ('u', 'r')
THEN UPDATE SET p.payload = ps.payload, p.ts_ms = ps.ts_ms
WHEN NOT MATCHED AND ps.op IN ('c', 'r', 'u')
THEN INSERT (p.id, p.agency_id, p.payload, p.ts_ms)
VALUES (ps.after_id, ps.after_agency_id, ps.payload, ps.ts_ms);
尝试“思考积压”的另一件事
将任务分解为多个步骤,就目前而言,制作一个临时 table 这是前半部分:
CREATE TABLE perm_but_call_temp_table AS
SELECT
record_content:payload:before.id::VARCHAR AS before_id,
record_content:payload:before.agency_id::VARCHAR AS before_agency_id,
record_content:payload:after.id::VARCHAR AS after_id,
record_content:payload:after.agency_id::VARCHAR AS after_agency_id,
record_content:payload:after::VARIANT AS payload,
record_content:payload:source.ts_ms::INT AS ts_ms,
record_content:payload:op::VARCHAR AS op,
COALESCE(after_id, before_id) AS id_a
COALESCE(after_agency_id, before_agency_id) AS id_b
FROM raw.message_stream
QUALIFY ROW_NUMBER() OVER (PARTITION BY id_a, id_b ORDER BY ts_ms DESC ) = 1
然后将其合并到您的主table。
MERGE INTO target.message AS p
USING perm_but_call_temp_table AS ps
ON p.id = ps.id_a AND p.agency_id = ps.id_b
WHEN MATCHED AND ps.op = 'd'
THEN DELETE
WHEN MATCHED AND ps.op IN ('u', 'r')
THEN UPDATE SET p.payload = ps.payload, p.ts_ms = ps.ts_ms
WHEN NOT MATCHED AND ps.op IN ('c', 'r', 'u')
THEN INSERT (p.id, p.agency_id, p.payload, p.ts_ms)
VALUES (ps.after_id, ps.after_agency_id, ps.payload, ps.ts_ms);
这将使您了解第一次或第二次操作“问题出在哪里”。它还可以让你合并到克隆中,并测试 equi join 版本 运行 是否更快,结果是否相同。
我正在使用 Snowflake Kafka Sink Connector 将数据从 Debezium 提取到 Snowflake table。我已经在此 table 上创建了一个 Stream 和一个 Task。当来自 Kafka 的数据到达源 table 时,流被填充并且任务运行 MERGE
命令将数据写入最终 table.
但是,由于流已增长到大约 5000 万行,任务无法 运行 完成并超时。
为了解决这个问题,我尝试了以下方法:
- 将任务的超时时间从 1 小时增加到 24 小时。
- 将仓库大小增加到中等。
任务在 24 小时后仍未完成并超时。
难道摄取5000万行需要更大的仓库来摄取这些行?如何让任务 运行 完成?
MERGE
声明
MERGE INTO TARGET.MESSAGE AS P
USING (SELECT RECORD_CONTENT:payload:before.id::VARCHAR AS BEFORE_ID,
RECORD_CONTENT:payload:before.agency_id::VARCHAR AS BEFORE_AGENCY_ID,
RECORD_CONTENT:payload:after.id::VARCHAR AS AFTER_ID,
RECORD_CONTENT:payload:after.agency_id::VARCHAR AS AFTER_AGENCY_ID,
RECORD_CONTENT:payload:after::VARIANT AS PAYLOAD,
RECORD_CONTENT:payload:source.ts_ms::INT AS TS_MS,
RECORD_CONTENT:payload:op::VARCHAR AS OP
FROM RAW.MESSAGE_STREAM
QUALIFY ROW_NUMBER() OVER (
PARTITION BY COALESCE(AFTER_ID, BEFORE_ID), COALESCE(AFTER_AGENCY_ID, BEFORE_AGENCY_ID)
ORDER BY TS_MS DESC
) = 1) PS ON (P.ID = PS.AFTER_ID AND P.AGENCY_ID = PS.AFTER_AGENCY_ID) OR
(P.ID = PS.BEFORE_ID AND P.AGENCY_ID = PS.BEFORE_AGENCY_ID)
WHEN MATCHED AND PS.OP = 'd' THEN DELETE
WHEN MATCHED AND PS.OP IN ('u', 'r') THEN UPDATE SET P.PAYLOAD = PS.PAYLOAD, P.TS_MS = PS.TS_MS
WHEN NOT MATCHED AND PS.OP IN ('c', 'r', 'u') THEN INSERT (P.ID, P.AGENCY_ID, P.PAYLOAD, P.TS_MS) VALUES (PS.AFTER_ID, PS.AFTER_AGENCY_ID, PS.PAYLOAD, PS.TS_MS);
EXPLAIN
计划
GlobalStats:
partitionsTotal=742
partitionsAssigned=742
bytesAssigned=3596441600
Operations:
1:0 ->Result number of rows inserted, number of rows updated, number of rows deleted
1:1 ->WindowFunction ROW_NUMBER() OVER (PARTITION BY IFNULL(TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'id')), TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'id'))), IFNULL(TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'agency_id')), TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'agency_id'))) ORDER BY TO_NUMBER(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'source'), 'ts_ms')) DESC NULLS FIRST)
1:2 ->LeftOuterJoin joinFilter: ((P.ID = (TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'id')))) AND (P.AGENCY_ID = (TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'agency_id'))))) OR ((P.ID = (TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'id')))) AND (P.AGENCY_ID = (TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'agency_id')))))
1:3 ->Filter ROW_NUMBER() OVER (PARTITION BY IFNULL(TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'id')), TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'id'))), IFNULL(TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'agency_id')), TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'agency_id'))) ORDER BY TO_NUMBER(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'source'), 'ts_ms')) DESC NULLS FIRST) = 1
1:4 ->UnionAll
1:5 ->Filter CHANGES.A_METADATA$ACTION IS NOT NULL
1:6 ->WithReference
1:7 ->WithClause CHANGES
1:8 ->Filter (A.METADATA$SHORTNAME IS NULL) OR (D.METADATA$SHORTNAME IS NULL) OR (NOT(EQUAL_NULL(SCAN_FDN_FILES.RECORD_METADATA, SCAN_FDN_FILES.RECORD_METADATA))) OR (NOT(EQUAL_NULL(SCAN_FDN_FILES.RECORD_CONTENT, SCAN_FDN_FILES.RECORD_CONTENT)))
1:9 ->FullOuterJoin joinKey: (D.METADATA$ROW_ID = A.METADATA$ROW_ID) AND (D.METADATA$SHORTNAME = A.METADATA$SHORTNAME)
1:10 ->TableScan DATABASE.RAW.MESSAGE as SCAN_FDN_FILES METADATA$PARTITION_ROW_NUMBER, METADATA$PARTITION_NAME, RECORD_METADATA, RECORD_CONTENT, METADATA$ORIGINAL_PARTITION_NAME, METADATA$ORIGINAL_PARTITION_ROW_NUMBER {partitionsTotal=17, partitionsAssigned=17, bytesAssigned=20623360}
1:11 ->TableScan DATABASE.RAW.MESSAGE as SCAN_FDN_FILES METADATA$PARTITION_ROW_NUMBER, METADATA$PARTITION_NAME, RECORD_METADATA, RECORD_CONTENT, METADATA$ORIGINAL_PARTITION_NAME, METADATA$ORIGINAL_PARTITION_ROW_NUMBER {partitionsTotal=507, partitionsAssigned=507, bytesAssigned=3519694336}
1:12 ->Filter CHANGES.D_METADATA$ACTION IS NOT NULL
1:13 ->WithReference
1:14 ->TableScan DATABASE.TARGET.MESSAGE as P ID, AGENCY_ID {partitionsTotal=218, partitionsAssigned=218, bytesAssigned=56123904}
查询配置文件
Query Profile
我有 re-jiggled 你 SQL 只是为了让我更易读..
MERGE INTO target.message AS p
USING (
SELECT
record_content:payload:before.id::VARCHAR AS before_id,
record_content:payload:before.agency_id::VARCHAR AS before_agency_id,
record_content:payload:after.id::VARCHAR AS after_id,
record_content:payload:after.agency_id::VARCHAR AS after_agency_id,
record_content:payload:after::VARIANT AS payload,
record_content:payload:source.ts_ms::INT AS ts_ms,
record_content:payload:op::VARCHAR AS op,
COALESCE(after_id, before_id) AS id_a
COALESCE(after_agency_id, before_agency_id) AS id_b
FROM raw.message_stream
QUALIFY ROW_NUMBER() OVER (PARTITION BY id_a, id_b ORDER BY ts_ms DESC ) = 1
) AS ps
ON (p.id = ps.after_id AND p.agency_id = ps.after_agency_id) OR
(p.id = ps.before_id AND p.agency_id = ps.before_agency_id)
WHEN MATCHED AND ps.op = 'd'
THEN DELETE
WHEN MATCHED AND ps.op IN ('u', 'r')
THEN UPDATE SET p.payload = ps.payload, p.ts_ms = ps.ts_ms
WHEN NOT MATCHED AND ps.op IN ('c', 'r', 'u')
THEN INSERT (p.id, p.agency_id, p.payload, p.ts_ms)
VALUES (ps.after_id, ps.after_agency_id, ps.payload, ps.ts_ms);
我在这里没有看到任何超级可怕的东西。我将 QUALIFY 中使用的两个值的 COALESCE 推入 SELECT 中,这样我就可以更简单地阅读它。
但是看看 ON 逻辑,如果之后的不匹配,你准备匹配之前的,但是将它与 COALESCE 逻辑混合是不是两个 after 值同时为空?也就是说,如果 after_id
为空,after_agency_id
也将为空。因为如果“之后”不为空但不匹配,那么如果您也不想关心检查“之前”。那么你可以使用:
ON p.id = ps.id_a AND p.agency_id = ps.id_b
尽管如此,您可能想要更好地命名它们。这应该会改善它。
回到 JOIN 逻辑,我认为上述可能适用的另一个原因是你是 grouping/partitioning 如果存在后值 ROW_NUMBER,这意味着你是否具有相同的值值和之前的值不同,由于当前 ROW_NUMBER 后者可能会被丢弃。
但除此之外,它看起来并没有做任何“真正糟糕”的事情,此时您可能想要 运行 一个 4-8 倍大的仓库,让它 24/8 小时看看如果它在 10% 的额外时间内完成。大仓库的成本应该在小实时钟时间里抵消。
愚蠢的想法:
在您提到的较小数据集上,尝试 SQL 变得非常简单:
MERGE INTO target.message AS p
USING (
(
SELECT
b.before_id,
b.before_agency_id,
b.after_id,
b.after_agency_id,
b.payload,
b.ts_ms,
b.op,
FROM (
SELECT
A.*
,COALESCE(a.after_id, a.before_id) AS id_a
,COALESCE(a.after_agency_id, a.before_agency_id) AS id_b
,ROW_NUMBER() OVER (PARTITION BY id_a, id_b ORDER BY ts_ms DESC ) as rn
FROM (
SELECT
record_content:payload:before.id::VARCHAR AS before_id,
record_content:payload:before.agency_id::VARCHAR AS before_agency_id,
record_content:payload:after.id::VARCHAR AS after_id,
record_content:payload:after.agency_id::VARCHAR AS after_agency_id,
record_content:payload:after::VARIANT AS payload,
record_content:payload:source.ts_ms::INT AS ts_ms,
record_content:payload:op::VARCHAR AS op,
FROM raw.message_stream
` ) as A
) AS B
WHERE b.rn = 1
) AS ps
ON (p.id = ps.after_id AND p.agency_id = ps.after_agency_id) OR
(p.id = ps.before_id AND p.agency_id = ps.before_agency_id)
WHEN MATCHED AND ps.op = 'd'
THEN DELETE
WHEN MATCHED AND ps.op IN ('u', 'r')
THEN UPDATE SET p.payload = ps.payload, p.ts_ms = ps.ts_ms
WHEN NOT MATCHED AND ps.op IN ('c', 'r', 'u')
THEN INSERT (p.id, p.agency_id, p.payload, p.ts_ms)
VALUES (ps.after_id, ps.after_agency_id, ps.payload, ps.ts_ms);
我怀疑连接对你的数据有用..在克隆的 tables 上,只是为了看看性能影响如何:
MERGE INTO target.message AS p
USING (
(
SELECT
--b.before_id,
--b.before_agency_id,
b.after_id,
b.after_agency_id,
b.payload,
b.ts_ms,
b.op,
b.id_a,
b.id_b
FROM (
SELECT
A.*
,COALESCE(a.after_id, a.before_id) AS id_a
,COALESCE(a.after_agency_id, a.before_agency_id) AS id_b
,ROW_NUMBER() OVER (PARTITION BY id_a, id_b ORDER BY ts_ms DESC ) as rn
FROM (
SELECT
record_content:payload:before.id::VARCHAR AS before_id,
record_content:payload:before.agency_id::VARCHAR AS before_agency_id,
record_content:payload:after.id::VARCHAR AS after_id,
record_content:payload:after.agency_id::VARCHAR AS after_agency_id,
record_content:payload:after::VARIANT AS payload,
record_content:payload:source.ts_ms::INT AS ts_ms,
record_content:payload:op::VARCHAR AS op,
FROM raw.message_stream
` ) as A
) AS B
WHERE b.rn = 1
) AS ps
ON p.id = ps.id_a AND p.agency_id = ps.id_b
WHEN MATCHED AND ps.op = 'd'
THEN DELETE
WHEN MATCHED AND ps.op IN ('u', 'r')
THEN UPDATE SET p.payload = ps.payload, p.ts_ms = ps.ts_ms
WHEN NOT MATCHED AND ps.op IN ('c', 'r', 'u')
THEN INSERT (p.id, p.agency_id, p.payload, p.ts_ms)
VALUES (ps.after_id, ps.after_agency_id, ps.payload, ps.ts_ms);
尝试“思考积压”的另一件事
将任务分解为多个步骤,就目前而言,制作一个临时 table 这是前半部分:
CREATE TABLE perm_but_call_temp_table AS
SELECT
record_content:payload:before.id::VARCHAR AS before_id,
record_content:payload:before.agency_id::VARCHAR AS before_agency_id,
record_content:payload:after.id::VARCHAR AS after_id,
record_content:payload:after.agency_id::VARCHAR AS after_agency_id,
record_content:payload:after::VARIANT AS payload,
record_content:payload:source.ts_ms::INT AS ts_ms,
record_content:payload:op::VARCHAR AS op,
COALESCE(after_id, before_id) AS id_a
COALESCE(after_agency_id, before_agency_id) AS id_b
FROM raw.message_stream
QUALIFY ROW_NUMBER() OVER (PARTITION BY id_a, id_b ORDER BY ts_ms DESC ) = 1
然后将其合并到您的主table。
MERGE INTO target.message AS p
USING perm_but_call_temp_table AS ps
ON p.id = ps.id_a AND p.agency_id = ps.id_b
WHEN MATCHED AND ps.op = 'd'
THEN DELETE
WHEN MATCHED AND ps.op IN ('u', 'r')
THEN UPDATE SET p.payload = ps.payload, p.ts_ms = ps.ts_ms
WHEN NOT MATCHED AND ps.op IN ('c', 'r', 'u')
THEN INSERT (p.id, p.agency_id, p.payload, p.ts_ms)
VALUES (ps.after_id, ps.after_agency_id, ps.payload, ps.ts_ms);
这将使您了解第一次或第二次操作“问题出在哪里”。它还可以让你合并到克隆中,并测试 equi join 版本 运行 是否更快,结果是否相同。