如何在 Snowflake 任务语句中使用复制存储集成?

How to use copy Storage Integration in a Snowflake task statement?

我正在测试 SnowFlake。为此,我在 GCP.

上创建了一个 SnowFlake 实例

其中一项测试是尝试每天从 STORAGE INTEGRATION 加载数据。

为此,我生成了 STORAGE INTEGRATIONstage

我测试了副本

copy into DEMO_DB.PUBLIC.DATA_BY_REGION from @sg_gcs_covid pattern='.*data_by_region.*'

一切顺利。

现在是时候用 task 语句测试每日调度了。 我创建了这个任务:

CREATE TASK schedule_regioni
  WAREHOUSE = COMPUTE_WH
  SCHEDULE = 'USING CRON 42 18 9 9 * Europe/Rome'
  COMMENT = 'Test Schedule'
AS
  copy into DEMO_DB.PUBLIC.DATA_BY_REGION from @sg_gcs_covid pattern='.*data_by_region.*';

我启用了它:

alter task schedule_regioni resume;

我没有收到任何错误,但任务没有加载数据。 为了解决这个问题,我不得不将 copy 放入存储过程并插入存储过程的调用而不是复制:

DROP TASK schedule_regioni;
CREATE TASK schedule_regioni
  WAREHOUSE = COMPUTE_WH
  SCHEDULE = 'USING CRON 42 18 9 9 * Europe/Rome'
  COMMENT = 'Test Schedule'
AS
  call sp_upload_c19_regioni();

问题是:这是期望的行为还是问题(如我所想)?

有人可以给我一些这方面的信息吗?

我刚刚尝试过(但在 AWS S3 上使用了存储集成和阶段)并且在任务的 sql 部分使用复制命令也能正常工作,无需调用存储过程。 为了开始调查这个问题,我会检查以下信息(也许为了调试我会创建每隔几分钟安排一次的任务):

  1. 检查task_history并验证执行

    select *
       from table(information_schema.task_history(
       scheduled_time_range_start=>dateadd('hour',-1,current_timestamp()),
       result_limit => 100,
       task_name=>'YOUR_TASK_NAME'));
    
  2. 如果上一步成功,检查copy_history并验证输入文件名、目标table和[=的数量32=] 是预期的

    SELECT *
    FROM TABLE (information_schema.copy_history(TABLE_NAME => 'YOUR_TABLE_NAME',
                start_time=> dateadd(hours, -1, current_timestamp())))
    ORDER BY 3 DESC;
    

检查结果是否与执行带有 sp 调用的任务时得到的结果相同。

另请确认您正在使用 COPY 命令加载尚未加载到 table 的新文件(否则您需要在复制命令中指定 FORCE = TRUE 参数或删除截断目标的元数据信息 table 重新加载相同的文件)。