使用 jinja 动态摄取模式列表以在 dbt 中查询
Dynamically ingesting list of schemas to query in dbt using jinja
我正在构建一个从多个模式中提取数据的数据管道。我正在尝试使用 jinja 来获取模式名称列表,然后在将数据合并到单个暂存区 table 之前查询所有这些模式中 table(具有相同名称)中的特定列。
在概念层面上,我知道我需要使用 Jinja 创建两个列表:一个包含我想要循环的 schema
名称的列表,以及一个我想要循环的列名列表在我正在查询的所有模式中存在的 table 中查询。我们调用模式名称列表 schema_names
和列名称列表 column_names
.
{% set var schema_names=('brand1', 'brand2', 'brand3') %}
{% set var column_names=('col1', 'col2', 'col3') %}
理论上,下一步似乎是我应该通过 Jinja 将这些值传递到一个查询中,该查询从 schema_vars
中的模式中的 tables 选择相同的列列表。如果我手动编写 SQL 查询,我会使用几个 WITH
语句,然后使用 UNION ALL
:
组合这些语句
/* Query that creates a variable with value `brand1` while querying `brand1` schema */
WITH schema1_data
AS (
SELECT 'brand1' AS schema_name,
col1,
col2,
col3
FROM brand1.table_name
),
/* Query that creates a variable with value `brand2` while querying `brand2` schema */
schema2_data
AS (
SELECT 'brand2' AS schema_name,
col1,
col2,
col3
FROM brand2.table_name
),
/* Query that creates a variable with value `brand3` while querying `brand3` schema */
schema3_data
AS (
SELECT 'brand3' AS schema_name,
col1,
col2,
col3
FROM brand3.table_name
),
/* Union statement combining identical tables from 3 schemas */
combined_schemas
AS (
SELECT *
FROM schema1_data
UNION ALL
(
SELECT *
FROM schema2_data)
UNION ALL
(
SELECT *
FROM schema3_data)
)
SELECT *
FROM combined_schemas
关于如何在 Jinja/dbt 中最好地实现这一点的想法?也对问题的替代框架持开放态度,这些框架采用不同的方法将来自不同模式中的相同 table 的信息组合到单个阶段 table 中,以供数据管道处理。
您的 UNION ALL 方法听起来不错。您可以稍微简化您的计划:如果您希望每个模型的列相同,那么您不需要 column_names
变量。此外,使用 CTE 仅在您手动编写时才有帮助,但使用 jinja 模板实际上会使您的生活更加艰难,因此不要强迫自己使用它们。
您将需要:
- a
jinja
for 循环记录在案 here.
- 要在 for 循环块中访问的
loop.last
特殊变量
我没有测试过以下代码,但我会这样写:
{% set var schema_names=('brand1', 'brand2', 'brand3') %}
{% for schema in schema_names %}
(
select
'{{ schema }}' as schema_name,
col1,
col2,
col3
from {{ schema }}.table_name
)
{% if not loop.last %}
union all
{% endif %}
{% endfor %}
如果您不想自己编写代码,dbt_utils
已经实现了此功能:union_relations
。
虽然,您可以参考它们的实现(比您需要的更复杂,因为它们处理更多的极端情况)会很有趣,您可以找到 here.
我正在构建一个从多个模式中提取数据的数据管道。我正在尝试使用 jinja 来获取模式名称列表,然后在将数据合并到单个暂存区 table 之前查询所有这些模式中 table(具有相同名称)中的特定列。
在概念层面上,我知道我需要使用 Jinja 创建两个列表:一个包含我想要循环的 schema
名称的列表,以及一个我想要循环的列名列表在我正在查询的所有模式中存在的 table 中查询。我们调用模式名称列表 schema_names
和列名称列表 column_names
.
{% set var schema_names=('brand1', 'brand2', 'brand3') %}
{% set var column_names=('col1', 'col2', 'col3') %}
理论上,下一步似乎是我应该通过 Jinja 将这些值传递到一个查询中,该查询从 schema_vars
中的模式中的 tables 选择相同的列列表。如果我手动编写 SQL 查询,我会使用几个 WITH
语句,然后使用 UNION ALL
:
/* Query that creates a variable with value `brand1` while querying `brand1` schema */
WITH schema1_data
AS (
SELECT 'brand1' AS schema_name,
col1,
col2,
col3
FROM brand1.table_name
),
/* Query that creates a variable with value `brand2` while querying `brand2` schema */
schema2_data
AS (
SELECT 'brand2' AS schema_name,
col1,
col2,
col3
FROM brand2.table_name
),
/* Query that creates a variable with value `brand3` while querying `brand3` schema */
schema3_data
AS (
SELECT 'brand3' AS schema_name,
col1,
col2,
col3
FROM brand3.table_name
),
/* Union statement combining identical tables from 3 schemas */
combined_schemas
AS (
SELECT *
FROM schema1_data
UNION ALL
(
SELECT *
FROM schema2_data)
UNION ALL
(
SELECT *
FROM schema3_data)
)
SELECT *
FROM combined_schemas
关于如何在 Jinja/dbt 中最好地实现这一点的想法?也对问题的替代框架持开放态度,这些框架采用不同的方法将来自不同模式中的相同 table 的信息组合到单个阶段 table 中,以供数据管道处理。
您的 UNION ALL 方法听起来不错。您可以稍微简化您的计划:如果您希望每个模型的列相同,那么您不需要 column_names
变量。此外,使用 CTE 仅在您手动编写时才有帮助,但使用 jinja 模板实际上会使您的生活更加艰难,因此不要强迫自己使用它们。
您将需要:
- a
jinja
for 循环记录在案 here. - 要在 for 循环块中访问的
loop.last
特殊变量
我没有测试过以下代码,但我会这样写:
{% set var schema_names=('brand1', 'brand2', 'brand3') %}
{% for schema in schema_names %}
(
select
'{{ schema }}' as schema_name,
col1,
col2,
col3
from {{ schema }}.table_name
)
{% if not loop.last %}
union all
{% endif %}
{% endfor %}
如果您不想自己编写代码,dbt_utils
已经实现了此功能:union_relations
。
虽然,您可以参考它们的实现(比您需要的更复杂,因为它们处理更多的极端情况)会很有趣,您可以找到 here.