创建阶段以从 Snowflake 脚本块中将数据从 S3 复制到 Snowflake?

Create stage to copy data from S3 to Snowflake from within Snowflake Scripting block?

我正在尝试将现有存储过程从 javascript 转换为雪花脚本。我找不到任何关于如何创建舞台的文档,因此我可以将数据从 s3 复制到雪花。任何人都可以给我建议,或者向我指出一些说明如何实现此目标的文档或教程吗?

这就是我在 javascript 中的做法(不是完整的代码 - 只是为上下文添加)。

$$
var create_replace_stage_cmd = `
create or replace my_stage
url = 's3://path/value1=${value1}/value1=${value2}/'
storage_integration = my_storage_integration;
`

var copy_into_table_cmd = `
copy into my_table(my_data)
from @my_stage
file_format='JSON_FILE_FORMAT'
`

    try {
        var sql_create_replace_stage_cmd = snowflake.createStatement({
            sqlText: create_replace_stage_cmd
        });
        var create_replace_stage_result = sql_create_replace_stage_cmd.execute();
        result += "\n Create stage command succeeded";
    } catch (err) {
        result += "\n Create stage command failed: " + err.code + "\n  State: " + err.state;
        result += "\n  Message: " + err.message;
        result += "\n Stack Trace:\n" + err.stackTraceTxt;
    }

    try {
        var sql_copy_into_table_cmd = snowflake.createStatement({
            sqlText: copy_into_table_cmd
        });
        var copy_into_table_result = sql_copy_into_table_cmd.execute();
        result += "\n Copy into table command succeeded";
    } catch (err) {
        result += "\n Copy into table command failed: " + err.code + "\n  State: " + err.state;
        result += "\n  Message: " + err.message;
        result += "\n Stack Trace:\n" + err.stackTraceTxt;
    }

    return result; 
$$

Snowflake 脚本重写,到目前为止。

CREATE OR REPLACE PROCEDURE my_stored_procedure("my_value" varchar) 
returns varchar 
language sql 
as 
$$ 
DECLARE 
current_day varchar; 
current_month varchar; 
current_year varchar; 
current_value VARCHAR; 
    
BEGIN
        
SELECT date_part(day, current_timestamp()) INTO :current_day;
SELECT date_part(month, current_timestamp()) INTO :current_month;` 
SELECT date_part(year, current_timestamp()) INTO :current_year; 
current_value:="my_value";
        
        
/* 1. Trying to create a stage which includes dynamic current_day, 
 *   current_month etc and the values passed when the stored procedure 
 *   is called, in the URL part. The stage name will be consistent but 
 *   the URL part will change depending on the date and what is passed 
 *   when the stored procedure is called.  
 *        
 * 2. Need to copy data from this stage to my snowflake database table
 */
    
RETURN current_value; 
exception   
    when statement_error then
            return object_construct('Error type', 'STATEMENT_ERROR',
                                    'SQLCODE', sqlcode,
                                    'SQLERRM', sqlerrm,
                                    'SQLSTATE', sqlstate);
END; 
$$;

这将由雪花任务按计划调用。

CALL my_stored_procedure('value');

所以首先测试以正常方式“创建带有一些日期前缀的阶段”:

CREATE STAGE aws_3s_a_2022_02_19 URL = 's3://bucket_name/data/2022/02/19';

一切顺利

所以现在我将通过 snowsql 脚本执行相同的操作:

execute immediate 
$$
begin
  create stage aws_3s_b_tmp URL = 's3://bucket_name/data/2022/02/19';
end;
$$
;
show stages;
created_on name database_name schema_name url
2022-02-19 00:32:46.278 -0800 AWS_3S_A_2022_02_19 TEST PUBLIC s3://bucket_name/data/2022/02/19
2022-02-19 00:32:41.008 -0800 AWS_3S_B_TMP TEST PUBLIC s3://bucket_name/data/2022/02/19

所以在 snowsql 中创建一个阶段是可行的。

摸索了一些...我发现我可以设置一个会话变量和用户:

set stage_name = 's3://bucket_name/data/2022/02/19';
create stage aws_3s_c_tmp URL = $stage_name;

因此可以做的是一个脚本:

$$
begin
  set stage_name = 's3://bucket_name/data/2022/02/19';
  create stage aws_4s_c_tmp URL = $stage_name;
end;
$$
;
created_on name database_name schema_name url
2022-02-19 00:32:46.278 -0800 AWS_3S_A_2022_02_19 TEST PUBLIC s3://bucket_name/data/2022/02/19
2022-02-19 00:32:41.008 -0800 AWS_3S_B_TMP TEST PUBLIC s3://bucket_name/data/2022/02/19
2022-02-19 00:39:55.741 -0800 AWS_3S_C_TMP TEST PUBLIC s3://bucket_name/data/2022/02/19
2022-02-19 00:41:04.685 -0800 AWS_4S_C_TMP TEST PUBLIC s3://bucket_name/data/2022/02/19

但是会话变量必须在 snowsql 脚本之前设置,(以上仅在创建超出范围的会话变量后有效)..

但我原来的观点是你不需要一个阶段,你可以从完整的桶中复制 name/path:

因此:

execute immediate 
$$
begin
  COPY INTO test.public.the_table FROM 's3://bucket_name/data/' PATTERN = '*.json';
end;
$$
;

无论如何..我试图让这个工作......但它就是不玩球

execute immediate 
$$
declare
    bucket_name text;    
begin
  bucket_name := 's3://bucket_name/data/';

  COPY INTO THE_TABLES FROM :bucket_name PATTERN = '*.json';
end;
$$
;

但如果您有一个 Javascript 存储过程,我会建议您。用户那个。我们做了一个非常相似的加载模式(但所有 SQL 都是在外部写入 [​​=49=])来管理我们上次加载的数据分块。所以我明白你想做什么了。

我更想问,为什么 Javascript 对你不起作用?因为原始 SQL 并不“似乎”想要成为 feed 构造的输入。但我对这个想法持开放态度,我还没有理解脚本。