使用 jinja 动态摄取模式列表以在 dbt 中查询

Dynamically ingesting list of schemas to query in dbt using jinja

我正在构建一个从多个模式中提取数据的数据管道。我正在尝试使用 jinja 来获取模式名称列表,然后在将数据合并到单个暂存区 table 之前查询所有这些模式中 table(具有相同名称)中的特定列。

在概念层面上,我知道我需要使用 Jinja 创建两个列表:一个包含我想要循环的 schema 名称的列表,以及一个我想要循环的列名列表在我正在查询的所有模式中存在的 table 中查询。我们调用模式名称列表 schema_names 和列名称列表 column_names.

理论上,下一步似乎是我应该通过 Jinja 将这些值传递到一个查询中,该查询从 schema_vars 中的模式中的 tables 选择相同的列列表。如果我手动编写 SQL 查询,我会使用几个 WITH 语句,然后使用 UNION ALL:

组合这些语句
/* Query that creates a variable with value `brand1` while querying `brand1` schema */
WITH schema1_data
       AS (
            SELECT 'brand1' AS schema_name,
                   col1,
                   col2,
                   col3
            FROM brand1.table_name
  ),

/* Query that creates a variable with value `brand2` while querying `brand2` schema */
     schema2_data
       AS (
            SELECT 'brand2' AS schema_name,
                   col1,
                   col2,
                   col3
            FROM brand2.table_name
     ),

  /* Query that creates a variable with value `brand3` while querying `brand3` schema */
     schema3_data
       AS (
            SELECT 'brand3' AS schema_name,
                   col1,
                   col2,
                   col3
            FROM brand3.table_name
     ),
  /* Union statement combining identical tables from 3 schemas */
     combined_schemas
       AS (
            SELECT *
            FROM schema1_data
            UNION ALL
            (
              SELECT *
              FROM schema2_data)
            UNION ALL
            (
              SELECT *
              FROM schema3_data)
     )

SELECT *
FROM combined_schemas

关于如何在 Jinja/dbt 中最好地实现这一点的想法?也对问题的替代框架持开放态度,这些框架采用不同的方法将来自不同模式中的相同 table 的信息组合到单个阶段 table 中,以供数据管道处理。

您的 UNION ALL 方法听起来不错。您可以稍微简化您的计划:如果您希望每个模型的列相同,那么您不需要 column_names 变量。此外,使用 CTE 仅在您手动编写时才有帮助,但使用 jinja 模板实际上会使您的生活更加艰难,因此不要强迫自己使用它们。

您将需要:

  • a jinja for 循环记录在案 here.
  • 要在 for 循环块中访问的 loop.last 特殊变量

我没有测试过以下代码,但我会这样写:

{% set var schema_names=('brand1', 'brand2', 'brand3') %}
{% for schema in schema_names %}
(
  select
    '{{ schema }}' as schema_name,
    col1,
    col2,
    col3
  from {{ schema }}.table_name
)
{% if not loop.last %}
union all
{% endif %}
{% endfor %}

如果您不想自己编写代码,dbt_utils 已经实现了此功能:union_relations

虽然,您可以参考它们的实现(比您需要的更复杂,因为它们处理更多的极端情况)会很有趣,您可以找到 here.