如何使用 Snowflake 任务摄取大型流 table?

How to ingest a large Stream table using Snowflake Task?

我正在使用 Snowflake Kafka Sink Connector 将数据从 Debezium 提取到 Snowflake table。我已经在此 table 上创建了一个 Stream 和一个 Task。当来自 Kafka 的数据到达源 table 时,流被填充并且任务运行 MERGE 命令将数据写入最终 table.

但是,由于流已增长到大约 5000 万行,任务无法 运行 完成并超时。

为了解决这个问题,我尝试了以下方法:

  1. 将任务的超时时间从 1 小时增加到 24 小时。
  2. 将仓库大小增加到中等。

任务在 24 小时后仍未完成并超时。

难道摄取5000万行需要更大的仓库来摄取这些行?如何让任务 运行 完成?

MERGE 声明

MERGE INTO TARGET.MESSAGE AS P
        USING (SELECT RECORD_CONTENT:payload:before.id::VARCHAR          AS BEFORE_ID,
                      RECORD_CONTENT:payload:before.agency_id::VARCHAR   AS BEFORE_AGENCY_ID,
                      RECORD_CONTENT:payload:after.id::VARCHAR           AS AFTER_ID,
                      RECORD_CONTENT:payload:after.agency_id::VARCHAR    AS AFTER_AGENCY_ID,
                      RECORD_CONTENT:payload:after::VARIANT              AS PAYLOAD,
                      RECORD_CONTENT:payload:source.ts_ms::INT           AS TS_MS,
                      RECORD_CONTENT:payload:op::VARCHAR                 AS OP
               FROM RAW.MESSAGE_STREAM
                   QUALIFY ROW_NUMBER() OVER (
                       PARTITION BY COALESCE(AFTER_ID, BEFORE_ID), COALESCE(AFTER_AGENCY_ID, BEFORE_AGENCY_ID)
                       ORDER BY TS_MS DESC
                       ) = 1) PS ON (P.ID = PS.AFTER_ID AND P.AGENCY_ID = PS.AFTER_AGENCY_ID) OR
                                    (P.ID = PS.BEFORE_ID AND P.AGENCY_ID = PS.BEFORE_AGENCY_ID)
        WHEN MATCHED AND PS.OP = 'd' THEN DELETE
        WHEN MATCHED AND PS.OP IN ('u', 'r') THEN UPDATE SET P.PAYLOAD = PS.PAYLOAD, P.TS_MS = PS.TS_MS
        WHEN NOT MATCHED AND PS.OP IN ('c', 'r', 'u') THEN INSERT (P.ID, P.AGENCY_ID, P.PAYLOAD, P.TS_MS) VALUES (PS.AFTER_ID, PS.AFTER_AGENCY_ID, PS.PAYLOAD, PS.TS_MS);

EXPLAIN 计划

GlobalStats:
    partitionsTotal=742
    partitionsAssigned=742
    bytesAssigned=3596441600
Operations:
1:0     ->Result  number of rows inserted, number of rows updated, number of rows deleted  
1:1          ->WindowFunction  ROW_NUMBER() OVER (PARTITION BY IFNULL(TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'id')), TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'id'))), IFNULL(TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'agency_id')), TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'agency_id'))) ORDER BY TO_NUMBER(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'source'), 'ts_ms')) DESC NULLS FIRST)  
1:2               ->LeftOuterJoin  joinFilter: ((P.ID = (TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'id')))) AND (P.AGENCY_ID = (TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'agency_id'))))) OR ((P.ID = (TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'id')))) AND (P.AGENCY_ID = (TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'agency_id')))))  
1:3                    ->Filter  ROW_NUMBER() OVER (PARTITION BY IFNULL(TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'id')), TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'id'))), IFNULL(TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'agency_id')), TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'agency_id'))) ORDER BY TO_NUMBER(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'source'), 'ts_ms')) DESC NULLS FIRST) = 1  
1:4                         ->UnionAll  
1:5                              ->Filter  CHANGES.A_METADATA$ACTION IS NOT NULL  
1:6                                   ->WithReference  
1:7                                        ->WithClause  CHANGES  
1:8                                             ->Filter  (A.METADATA$SHORTNAME IS NULL) OR (D.METADATA$SHORTNAME IS NULL) OR (NOT(EQUAL_NULL(SCAN_FDN_FILES.RECORD_METADATA, SCAN_FDN_FILES.RECORD_METADATA))) OR (NOT(EQUAL_NULL(SCAN_FDN_FILES.RECORD_CONTENT, SCAN_FDN_FILES.RECORD_CONTENT)))  
1:9                                                  ->FullOuterJoin  joinKey: (D.METADATA$ROW_ID = A.METADATA$ROW_ID) AND (D.METADATA$SHORTNAME = A.METADATA$SHORTNAME)  
1:10                                                       ->TableScan  DATABASE.RAW.MESSAGE as SCAN_FDN_FILES  METADATA$PARTITION_ROW_NUMBER, METADATA$PARTITION_NAME, RECORD_METADATA, RECORD_CONTENT, METADATA$ORIGINAL_PARTITION_NAME, METADATA$ORIGINAL_PARTITION_ROW_NUMBER  {partitionsTotal=17, partitionsAssigned=17, bytesAssigned=20623360}
1:11                                                       ->TableScan  DATABASE.RAW.MESSAGE as SCAN_FDN_FILES  METADATA$PARTITION_ROW_NUMBER, METADATA$PARTITION_NAME, RECORD_METADATA, RECORD_CONTENT, METADATA$ORIGINAL_PARTITION_NAME, METADATA$ORIGINAL_PARTITION_ROW_NUMBER  {partitionsTotal=507, partitionsAssigned=507, bytesAssigned=3519694336}
1:12                              ->Filter  CHANGES.D_METADATA$ACTION IS NOT NULL  
1:13                                   ->WithReference  
1:14                    ->TableScan  DATABASE.TARGET.MESSAGE as P  ID, AGENCY_ID  {partitionsTotal=218, partitionsAssigned=218, bytesAssigned=56123904}

查询配置文件

Query Profile

我有 re-jiggled 你 SQL 只是为了让我更易读..

MERGE INTO target.message AS p
    USING (
    SELECT 
        record_content:payload:before.id::VARCHAR          AS before_id,
        record_content:payload:before.agency_id::VARCHAR   AS before_agency_id,
        record_content:payload:after.id::VARCHAR           AS after_id,
        record_content:payload:after.agency_id::VARCHAR    AS after_agency_id,
        record_content:payload:after::VARIANT              AS payload,
        record_content:payload:source.ts_ms::INT           AS ts_ms,
        record_content:payload:op::VARCHAR                 AS op,
        COALESCE(after_id, before_id) AS id_a
        COALESCE(after_agency_id, before_agency_id) AS id_b
    FROM raw.message_stream
    QUALIFY ROW_NUMBER() OVER (PARTITION BY id_a, id_b ORDER BY ts_ms DESC ) = 1
) AS ps 
    ON (p.id = ps.after_id AND p.agency_id = ps.after_agency_id) OR
       (p.id = ps.before_id AND p.agency_id = ps.before_agency_id)
WHEN MATCHED AND ps.op = 'd' 
    THEN DELETE
WHEN MATCHED AND ps.op IN ('u', 'r') 
    THEN UPDATE SET p.payload = ps.payload, p.ts_ms = ps.ts_ms
WHEN NOT MATCHED AND ps.op IN ('c', 'r', 'u') 
    THEN INSERT (p.id, p.agency_id, p.payload, p.ts_ms) 
        VALUES (ps.after_id, ps.after_agency_id, ps.payload, ps.ts_ms);

我在这里没有看到任何超级可怕的东西。我将 QUALIFY 中使用的两个值的 COALESCE 推入 SELECT 中,这样我就可以更简单地阅读它。

但是看看 ON 逻辑,如果之后的不匹配,你准备匹配之前的,但是将它与 COALESCE 逻辑混合是不是两个 after 值同时为空?也就是说,如果 after_id 为空,after_agency_id 也将为空。因为如果“之后”不为空但不匹配,那么如果您也不想关心检查“之前”。那么你可以使用:

ON p.id = ps.id_a AND p.agency_id = ps.id_b

尽管如此,您可能想要更好地命名它们。这应该会改善它。

回到 JOIN 逻辑,我认为上述可能适用的另一个原因是你是 grouping/partitioning 如果存在后值 ROW_NUMBER,这意味着你是否具有相同的值值和之前的值不同,由于当前 ROW_NUMBER 后者可能会被丢弃。

但除此之外,它看起来并没有做任何“真正糟糕”的事情,此时您可能想要 运行 一个 4-8 倍大的仓库,让它 24/8 小时看看如果它在 10% 的额外时间内完成。大仓库的成本应该在小实时钟时间里抵消。

愚蠢的想法:

在您提到的较小数据集上,尝试 SQL 变得非常简单:

MERGE INTO target.message AS p
USING (
    (
    SELECT 
        b.before_id,
        b.before_agency_id,
        b.after_id,
        b.after_agency_id,
        b.payload,
        b.ts_ms,
        b.op,
    FROM (
        SELECT 
            A.*
            ,COALESCE(a.after_id, a.before_id) AS id_a
            ,COALESCE(a.after_agency_id, a.before_agency_id) AS id_b
            ,ROW_NUMBER() OVER (PARTITION BY id_a, id_b ORDER BY ts_ms DESC ) as rn
        FROM (
            SELECT 
                record_content:payload:before.id::VARCHAR          AS before_id,
                record_content:payload:before.agency_id::VARCHAR   AS before_agency_id,
                record_content:payload:after.id::VARCHAR           AS after_id,
                record_content:payload:after.agency_id::VARCHAR    AS after_agency_id,
                record_content:payload:after::VARIANT              AS payload,
                record_content:payload:source.ts_ms::INT           AS ts_ms,
                record_content:payload:op::VARCHAR                 AS op,
            FROM raw.message_stream
    `   ) as A
    ) AS B
    WHERE b.rn = 1   
) AS ps 
    ON (p.id = ps.after_id AND p.agency_id = ps.after_agency_id) OR
       (p.id = ps.before_id AND p.agency_id = ps.before_agency_id)
WHEN MATCHED AND ps.op = 'd' 
    THEN DELETE
WHEN MATCHED AND ps.op IN ('u', 'r') 
    THEN UPDATE SET p.payload = ps.payload, p.ts_ms = ps.ts_ms
WHEN NOT MATCHED AND ps.op IN ('c', 'r', 'u') 
    THEN INSERT (p.id, p.agency_id, p.payload, p.ts_ms) 
        VALUES (ps.after_id, ps.after_agency_id, ps.payload, ps.ts_ms);

我怀疑连接对你的数据有用..在克隆的 tables 上,只是为了看看性能影响如何:

MERGE INTO target.message AS p
USING (
    (
    SELECT 
        --b.before_id,
        --b.before_agency_id,
        b.after_id,
        b.after_agency_id,
        b.payload,
        b.ts_ms,
        b.op,
        b.id_a,
        b.id_b
    FROM (
        SELECT 
            A.*
            ,COALESCE(a.after_id, a.before_id) AS id_a
            ,COALESCE(a.after_agency_id, a.before_agency_id) AS id_b
            ,ROW_NUMBER() OVER (PARTITION BY id_a, id_b ORDER BY ts_ms DESC ) as rn
        FROM (
            SELECT 
                record_content:payload:before.id::VARCHAR          AS before_id,
                record_content:payload:before.agency_id::VARCHAR   AS before_agency_id,
                record_content:payload:after.id::VARCHAR           AS after_id,
                record_content:payload:after.agency_id::VARCHAR    AS after_agency_id,
                record_content:payload:after::VARIANT              AS payload,
                record_content:payload:source.ts_ms::INT           AS ts_ms,
                record_content:payload:op::VARCHAR                 AS op,
            FROM raw.message_stream
    `   ) as A
    ) AS B
    WHERE b.rn = 1   
) AS ps 
    ON p.id = ps.id_a AND p.agency_id = ps.id_b
WHEN MATCHED AND ps.op = 'd' 
    THEN DELETE
WHEN MATCHED AND ps.op IN ('u', 'r') 
    THEN UPDATE SET p.payload = ps.payload, p.ts_ms = ps.ts_ms
WHEN NOT MATCHED AND ps.op IN ('c', 'r', 'u') 
    THEN INSERT (p.id, p.agency_id, p.payload, p.ts_ms) 
        VALUES (ps.after_id, ps.after_agency_id, ps.payload, ps.ts_ms);

尝试“思考积压”的另一件事

将任务分解为多个步骤,就目前而言,制作一个临时 table 这是前半部分:

CREATE TABLE perm_but_call_temp_table AS
   SELECT 
        record_content:payload:before.id::VARCHAR          AS before_id,
        record_content:payload:before.agency_id::VARCHAR   AS before_agency_id,
        record_content:payload:after.id::VARCHAR           AS after_id,
        record_content:payload:after.agency_id::VARCHAR    AS after_agency_id,
        record_content:payload:after::VARIANT              AS payload,
        record_content:payload:source.ts_ms::INT           AS ts_ms,
        record_content:payload:op::VARCHAR                 AS op,
        COALESCE(after_id, before_id) AS id_a
        COALESCE(after_agency_id, before_agency_id) AS id_b
    FROM raw.message_stream
    QUALIFY ROW_NUMBER() OVER (PARTITION BY id_a, id_b ORDER BY ts_ms DESC ) = 1

然后将其合并到您的主table。

MERGE INTO target.message AS p
USING perm_but_call_temp_table AS ps 
    ON p.id = ps.id_a AND p.agency_id = ps.id_b
WHEN MATCHED AND ps.op = 'd' 
    THEN DELETE
WHEN MATCHED AND ps.op IN ('u', 'r') 
    THEN UPDATE SET p.payload = ps.payload, p.ts_ms = ps.ts_ms
WHEN NOT MATCHED AND ps.op IN ('c', 'r', 'u') 
    THEN INSERT (p.id, p.agency_id, p.payload, p.ts_ms) 
        VALUES (ps.after_id, ps.after_agency_id, ps.payload, ps.ts_ms);

这将使您了解第一次或第二次操作“问题出在哪里”。它还可以让你合并到克隆中,并测试 equi join 版本 运行 是否更快,结果是否相同。