如何以 Snowflake 中的动态值为中心
How to pivot on dynamic values in Snowflake
我想根据可以包含“动态”值(事先并不总是已知)的字段来转换 table。
我可以通过对值进行硬编码来使其工作(这是不可取的):
SELECT *
FROM my_table
pivot(SUM(amount) FOR type_id IN (1,2,3,4,5,20,50,83,141,...));
但我无法使用查询来动态提供值:
SELECT *
FROM my_table
pivot(SUM(amount) FOR type_id IN (SELECT id FROM types));
---
090150 (22000): Single-row subquery returns more than one row.
SELECT *
FROM my_table
pivot(SUM(amount) FOR type_id IN (SELECT ARRAY_AGG(id) FROM types));
---
001038 (22023): SQL compilation error:
Can not convert parameter 'my_table.type_id' of type [NUMBER(38,0)] into expected type [ARRAY]
有没有办法做到这一点?
我认为在原生 SQL 中不可能,但我编写了 an article 并发布了一些代码,展示了我的团队如何通过从 Python 生成查询来做到这一点。
您可以直接调用 Python 脚本,传递类似于选项 Excel 的参数,为您提供数据透视表:
python generate_pivot_query.py \
--dbtype snowflake --database mydb \
--host myhost.url --port 5432 \
--user me --password myp4ssw0rd \
--base-columns customer_id \
--pivot-columns category \
--exclude-columns order_id \
--aggfunction-mappings amount=sum \
myschema orders
或者,如果您是 Airflow,您可以使用 CreatePivotTableOperator
直接创建任务。
我写了一个 Snowflake 存储过程来获取 Snowflake 内部的动态枢轴,检查:
3 个步骤:
- 查询
- 调用存储过程
call pivot_prev_results()
- 查找结果
select * from table(result_scan(last_query_id(-2)))
程序:
create or replace procedure pivot_prev_results()
returns string
language javascript
execute as caller as
$$
var cols_query = `
select '\''
|| listagg(distinct pivot_column, '\',\'') within group (order by pivot_column)
|| '\''
from table(result_scan(last_query_id(-1)))
`;
var stmt1 = snowflake.createStatement({sqlText: cols_query});
var results1 = stmt1.execute();
results1.next();
var col_list = results1.getColumnValue(1);
pivot_query = `
select *
from (select * from table(result_scan(last_query_id(-2))))
pivot(max(pivot_value) for pivot_column in (${col_list}))
`
var stmt2 = snowflake.createStatement({sqlText: pivot_query});
stmt2.execute();
return `select * from table(result_scan('${stmt2.getQueryId()}'));\n select * from table(result_scan(last_query_id(-2)));`;
$$;
受我两位前辈的启发,我创建了另一个存储过程,可以调用它来创建甚至 multi-grouped 和 multi-pivot 模拟数据透视查询:
create or replace procedure
test_pivot.public.get_full_pivot(
"source" varchar, // fully-qualified 'table/view_name' or full '(subquery)'
"row_headers" varchar, // comma-separated list of 1+ GROUP BY field names
"col_header1" varchar, // first (mandatory) PIVOT field name
"col_header2" varchar, // secondary (optional) PIVOT field name ('' if none)
"agg" varchar, // field name for the aggregate values
"aggf" varchar) // aggregate function (sum, avg, min, max, count...)
returns varchar
language javascript
as $$
// collect all distinct values for a column header field
function get_distinct_values(col_header) {
var vals = [];
if (col_header != '') {
var result = snowflake.execute(
{sqlText: `select distinct ${col_header}\n`
+ `from ${source}\n`
+ `order by ${col_header}`}); // nulls last!
while (result.next())
vals.push(result.getColumnValueAsString(1));
}
return vals;
}
var vals1 = get_distinct_values(col_header1);
var vals2 = get_distinct_values(col_header2);
// create and return the emulated pivot query, for one or two column header values
var query = `select ${row_headers}`;
if (vals2.length == 0)
for (const i in vals1) {
var cond1 = (vals1[i] == 'null'
? `${col_header1} is null` : `to_char(${col_header1})='${vals1[i]}'`);
query += `,\n ${aggf}(iff(${cond1}, ${agg}, null)) as "${vals1[i]}"`;
}
else
for (const i in vals1)
for (const j in vals2) {
var cond1 = (vals1[i] == 'null'
? `${col_header1} is null` : `to_char(${col_header1})='${vals1[i]}'`);
var cond2 = (vals2[j] == 'null'
? `${col_header2} is null` : `to_char(${col_header2})='${vals2[j]}'`);
query += `,\n ${aggf}(iff(${cond1} AND ${cond2}, ${agg}, null)) as "${vals1[i]}+${vals2[j]}"`;
}
query += `\nfrom ${source}\n`
+ `group by ${row_headers}\n`
+ `order by ${row_headers};`; // nulls last!
return query;
$$;
致电:
call test_pivot.public.get_full_pivot(
'test_pivot.public.demographics',
'country, education', 'status', 'gender', 'sales', 'sum');
生成以下内容SQL:
select country, education,
sum(iff(to_char(status)='divorced' AND to_char(gender)='F', sales, null)) as "divorced+F",
sum(iff(to_char(status)='divorced' AND to_char(gender)='M', sales, null)) as "divorced+M",
sum(iff(to_char(status)='married' AND to_char(gender)='F', sales, null)) as "married+F",
sum(iff(to_char(status)='married' AND to_char(gender)='M', sales, null)) as "married+M",
sum(iff(to_char(status)='single' AND to_char(gender)='F', sales, null)) as "single+F",
sum(iff(to_char(status)='single' AND to_char(gender)='M', sales, null)) as "single+M",
sum(iff(status is null AND to_char(gender)='F', sales, null)) as "null+F",
sum(iff(status is null AND to_char(gender)='M', sales, null)) as "null+M"
from test_pivot.public.demographics
group by country, education
order by country, education;
可能 return 结果结构如下:
我想根据可以包含“动态”值(事先并不总是已知)的字段来转换 table。
我可以通过对值进行硬编码来使其工作(这是不可取的):
SELECT *
FROM my_table
pivot(SUM(amount) FOR type_id IN (1,2,3,4,5,20,50,83,141,...));
但我无法使用查询来动态提供值:
SELECT *
FROM my_table
pivot(SUM(amount) FOR type_id IN (SELECT id FROM types));
---
090150 (22000): Single-row subquery returns more than one row.
SELECT *
FROM my_table
pivot(SUM(amount) FOR type_id IN (SELECT ARRAY_AGG(id) FROM types));
---
001038 (22023): SQL compilation error:
Can not convert parameter 'my_table.type_id' of type [NUMBER(38,0)] into expected type [ARRAY]
有没有办法做到这一点?
我认为在原生 SQL 中不可能,但我编写了 an article 并发布了一些代码,展示了我的团队如何通过从 Python 生成查询来做到这一点。
您可以直接调用 Python 脚本,传递类似于选项 Excel 的参数,为您提供数据透视表:
python generate_pivot_query.py \
--dbtype snowflake --database mydb \
--host myhost.url --port 5432 \
--user me --password myp4ssw0rd \
--base-columns customer_id \
--pivot-columns category \
--exclude-columns order_id \
--aggfunction-mappings amount=sum \
myschema orders
或者,如果您是 Airflow,您可以使用 CreatePivotTableOperator
直接创建任务。
我写了一个 Snowflake 存储过程来获取 Snowflake 内部的动态枢轴,检查:
3 个步骤:
- 查询
- 调用存储过程
call pivot_prev_results()
- 查找结果
select * from table(result_scan(last_query_id(-2)))
程序:
create or replace procedure pivot_prev_results()
returns string
language javascript
execute as caller as
$$
var cols_query = `
select '\''
|| listagg(distinct pivot_column, '\',\'') within group (order by pivot_column)
|| '\''
from table(result_scan(last_query_id(-1)))
`;
var stmt1 = snowflake.createStatement({sqlText: cols_query});
var results1 = stmt1.execute();
results1.next();
var col_list = results1.getColumnValue(1);
pivot_query = `
select *
from (select * from table(result_scan(last_query_id(-2))))
pivot(max(pivot_value) for pivot_column in (${col_list}))
`
var stmt2 = snowflake.createStatement({sqlText: pivot_query});
stmt2.execute();
return `select * from table(result_scan('${stmt2.getQueryId()}'));\n select * from table(result_scan(last_query_id(-2)));`;
$$;
受我两位前辈的启发,我创建了另一个存储过程,可以调用它来创建甚至 multi-grouped 和 multi-pivot 模拟数据透视查询:
create or replace procedure
test_pivot.public.get_full_pivot(
"source" varchar, // fully-qualified 'table/view_name' or full '(subquery)'
"row_headers" varchar, // comma-separated list of 1+ GROUP BY field names
"col_header1" varchar, // first (mandatory) PIVOT field name
"col_header2" varchar, // secondary (optional) PIVOT field name ('' if none)
"agg" varchar, // field name for the aggregate values
"aggf" varchar) // aggregate function (sum, avg, min, max, count...)
returns varchar
language javascript
as $$
// collect all distinct values for a column header field
function get_distinct_values(col_header) {
var vals = [];
if (col_header != '') {
var result = snowflake.execute(
{sqlText: `select distinct ${col_header}\n`
+ `from ${source}\n`
+ `order by ${col_header}`}); // nulls last!
while (result.next())
vals.push(result.getColumnValueAsString(1));
}
return vals;
}
var vals1 = get_distinct_values(col_header1);
var vals2 = get_distinct_values(col_header2);
// create and return the emulated pivot query, for one or two column header values
var query = `select ${row_headers}`;
if (vals2.length == 0)
for (const i in vals1) {
var cond1 = (vals1[i] == 'null'
? `${col_header1} is null` : `to_char(${col_header1})='${vals1[i]}'`);
query += `,\n ${aggf}(iff(${cond1}, ${agg}, null)) as "${vals1[i]}"`;
}
else
for (const i in vals1)
for (const j in vals2) {
var cond1 = (vals1[i] == 'null'
? `${col_header1} is null` : `to_char(${col_header1})='${vals1[i]}'`);
var cond2 = (vals2[j] == 'null'
? `${col_header2} is null` : `to_char(${col_header2})='${vals2[j]}'`);
query += `,\n ${aggf}(iff(${cond1} AND ${cond2}, ${agg}, null)) as "${vals1[i]}+${vals2[j]}"`;
}
query += `\nfrom ${source}\n`
+ `group by ${row_headers}\n`
+ `order by ${row_headers};`; // nulls last!
return query;
$$;
致电:
call test_pivot.public.get_full_pivot(
'test_pivot.public.demographics',
'country, education', 'status', 'gender', 'sales', 'sum');
生成以下内容SQL:
select country, education,
sum(iff(to_char(status)='divorced' AND to_char(gender)='F', sales, null)) as "divorced+F",
sum(iff(to_char(status)='divorced' AND to_char(gender)='M', sales, null)) as "divorced+M",
sum(iff(to_char(status)='married' AND to_char(gender)='F', sales, null)) as "married+F",
sum(iff(to_char(status)='married' AND to_char(gender)='M', sales, null)) as "married+M",
sum(iff(to_char(status)='single' AND to_char(gender)='F', sales, null)) as "single+F",
sum(iff(to_char(status)='single' AND to_char(gender)='M', sales, null)) as "single+M",
sum(iff(status is null AND to_char(gender)='F', sales, null)) as "null+F",
sum(iff(status is null AND to_char(gender)='M', sales, null)) as "null+M"
from test_pivot.public.demographics
group by country, education
order by country, education;
可能 return 结果结构如下: