如何以 Snowflake 中的动态值为中心

How to pivot on dynamic values in Snowflake

我想根据可以包含“动态”值(事先并不总是已知)的字段来转换 table。

我可以通过对值进行硬编码来使其工作(这是不可取的):

SELECT *
FROM my_table
  pivot(SUM(amount) FOR type_id IN (1,2,3,4,5,20,50,83,141,...));

但我无法使用查询来动态提供值:

SELECT *
FROM my_table
  pivot(SUM(amount) FOR type_id IN (SELECT id FROM types));
---
090150 (22000): Single-row subquery returns more than one row. 

SELECT *
FROM my_table
  pivot(SUM(amount) FOR type_id IN (SELECT ARRAY_AGG(id) FROM types));
---
001038 (22023): SQL compilation error:                                          
Can not convert parameter 'my_table.type_id' of type [NUMBER(38,0)] into expected type [ARRAY]

有没有办法做到这一点?

我认为在原生 SQL 中不可能,但我编写了 an article 并发布了一些代码,展示了我的团队如何通过从 Python 生成查询来做到这一点。

您可以直接调用 Python 脚本,传递类似于选项 Excel 的参数,为您提供数据透视表:

python generate_pivot_query.py                  \
    --dbtype snowflake --database mydb          \
    --host myhost.url --port 5432               \
    --user me --password myp4ssw0rd             \
    --base-columns customer_id                  \
    --pivot-columns category                    \
    --exclude-columns order_id                  \
    --aggfunction-mappings amount=sum           \
    myschema orders

或者,如果您是 Airflow,您可以使用 CreatePivotTableOperator 直接创建任务。

我写了一个 Snowflake 存储过程来获取 Snowflake 内部的动态枢轴,检查:

3 个步骤:

  1. 查询
  2. 调用存储过程call pivot_prev_results()
  3. 查找结果select * from table(result_scan(last_query_id(-2)))

程序:

create or replace procedure pivot_prev_results()
returns string
language javascript
execute as caller as
$$
  var cols_query = `
      select '\'' 
        || listagg(distinct pivot_column, '\',\'') within group (order by pivot_column)
        || '\'' 
      from table(result_scan(last_query_id(-1)))
  `;
  var stmt1 = snowflake.createStatement({sqlText: cols_query});
  var results1 = stmt1.execute();
  results1.next();
  var col_list = results1.getColumnValue(1);
  
  pivot_query = `
         select * 
         from (select * from table(result_scan(last_query_id(-2)))) 
         pivot(max(pivot_value) for pivot_column in (${col_list}))
     `
  var stmt2 = snowflake.createStatement({sqlText: pivot_query});
  stmt2.execute();
  return `select * from table(result_scan('${stmt2.getQueryId()}'));\n  select * from table(result_scan(last_query_id(-2)));`;
$$;

受我两位前辈的启发,我创建了另一个存储过程,可以调用它来创建甚至 multi-grouped 和 multi-pivot 模拟数据透视查询:

create or replace procedure
  test_pivot.public.get_full_pivot(
    "source" varchar,      // fully-qualified 'table/view_name' or full '(subquery)'
    "row_headers" varchar, // comma-separated list of 1+ GROUP BY field names
    "col_header1" varchar, // first (mandatory) PIVOT field name
    "col_header2" varchar, // secondary (optional) PIVOT field name ('' if none)
    "agg" varchar,         // field name for the aggregate values
    "aggf" varchar)        // aggregate function (sum, avg, min, max, count...)
  returns varchar
  language javascript
as $$
  // collect all distinct values for a column header field
  function get_distinct_values(col_header) {
    var vals = [];
    if (col_header != '') {
      var result = snowflake.execute(
        {sqlText: `select distinct ${col_header}\n`
          + `from ${source}\n`
          + `order by ${col_header}`}); // nulls last!
      while (result.next())
        vals.push(result.getColumnValueAsString(1));
    }
    return vals;
  }
  var vals1 = get_distinct_values(col_header1);
  var vals2 = get_distinct_values(col_header2);
  
  // create and return the emulated pivot query, for one or two column header values
  var query = `select ${row_headers}`;
  if (vals2.length == 0)
    for (const i in vals1) {
      var cond1 = (vals1[i] == 'null'
        ? `${col_header1} is null` : `to_char(${col_header1})='${vals1[i]}'`);
      query += `,\n  ${aggf}(iff(${cond1}, ${agg}, null)) as "${vals1[i]}"`;
    }
  else
    for (const i in vals1)
      for (const j in vals2) {
        var cond1 = (vals1[i] == 'null'
          ? `${col_header1} is null` : `to_char(${col_header1})='${vals1[i]}'`);
        var cond2 = (vals2[j] == 'null'
          ? `${col_header2} is null` : `to_char(${col_header2})='${vals2[j]}'`);
        query += `,\n  ${aggf}(iff(${cond1} AND ${cond2}, ${agg}, null)) as "${vals1[i]}+${vals2[j]}"`;
      }
  query += `\nfrom ${source}\n`
    + `group by ${row_headers}\n`
    + `order by ${row_headers};`; // nulls last!
  return query;
$$;

致电:

call test_pivot.public.get_full_pivot(
  'test_pivot.public.demographics',
  'country, education', 'status', 'gender', 'sales', 'sum');

生成以下内容SQL:

select country, education,
  sum(iff(to_char(status)='divorced' AND to_char(gender)='F', sales, null)) as "divorced+F",
  sum(iff(to_char(status)='divorced' AND to_char(gender)='M', sales, null)) as "divorced+M",
  sum(iff(to_char(status)='married' AND to_char(gender)='F', sales, null)) as "married+F",
  sum(iff(to_char(status)='married' AND to_char(gender)='M', sales, null)) as "married+M",
  sum(iff(to_char(status)='single' AND to_char(gender)='F', sales, null)) as "single+F",
  sum(iff(to_char(status)='single' AND to_char(gender)='M', sales, null)) as "single+M",
  sum(iff(status is null AND to_char(gender)='F', sales, null)) as "null+F",
  sum(iff(status is null AND to_char(gender)='M', sales, null)) as "null+M"
from test_pivot.public.demographics
group by country, education
order by country, education;

可能 return 结果结构如下: