如何在 DBT 中编写增量模型?
How do you write incremental models in DBT?
我有两个大的 table 我正在尝试使用 dbt 加入和过滤。
这个SQL很简单,大致是:
SELECT
u.user_id, t.transaction_id
FROM users u
JOIN transactions t ON t.user_id = u.user_id
WHERE u.active = 1
目前我正在使用“table”具体化,但这相当浪费,因为 table 底层 table 与 [=27] 有 99.99% 相同=] 运行.
但是,我无法从 DBT 文档中了解到如何将此模型设置为“增量”。
有什么想法吗?
PS。我 运行 正在 SQL 服务器上。
如果您有日期字段,您可以使用它来仅加载最后的数据。
IE。你有 transaction_date 列。
{{ config(
materialized='incremental',
as_columnstore=false,
pre_hook="""
{% if is_incremental() %}
delete from {{this}}
where transaction_date >= '{{ (modules.datetime.datetime.now() - modules.datetime.timedelta(2)).isoformat() }}'
{% endif %}
"""
)}}
SELECT
u.user_id, t.transaction_id
FROM users u
JOIN transactions t ON t.user_id = u.user_id
WHERE u.active = 1
{% if is_incremental() %}
and transaction_date >= '{{ (modules.datetime.datetime.now() - modules.datetime.timedelta(2)).isoformat() }}'
{% endif %}
第一次运行这个模型时,它会忽略“if is_incremental()”下的所有代码。所有即将到来的 运行 它将清理过去两天的交易并重新加载它们。
正如@anders-swanson 在他的评论中所写,如果 transaction_id
绝对是独一无二的,您可以将其设置为 unique_key
并将您的模型具体化为增量 table。
dbt's docs 解释如何做到这一点。使用您的示例,它可能是:
{{
config(
materialized='incremental',
unique_key='transaction_id'
)
}}
select
u.user_id, t.transaction_id
from users u
join transactions t ON t.user_id = u.user_id
where u.active = 1
如果 transaction_id
不是唯一的,但 transaction_id
||user_id
是唯一的,您可以尝试创建一个新列,将这些列连接到上游 dbt 模型中,然后分配给它作为 unique_key
:
{{
config(
materialized='incremental',
unique_key='pkey'
)
}}
select
u.user_id,
t.transaction_id,
u.user_id||t.transaction_id as pkey
from users u
join transactions t ON t.user_id = u.user_id
where u.active = 1
否则,您必须拉入一个列,该列要么是 a) 唯一的,要么 b) 具有可用于应用 is_incremental()
过滤器的有序质量(如@viacheslav-nefedov 写道).
我有两个大的 table 我正在尝试使用 dbt 加入和过滤。
这个SQL很简单,大致是:
SELECT
u.user_id, t.transaction_id
FROM users u
JOIN transactions t ON t.user_id = u.user_id
WHERE u.active = 1
目前我正在使用“table”具体化,但这相当浪费,因为 table 底层 table 与 [=27] 有 99.99% 相同=] 运行.
但是,我无法从 DBT 文档中了解到如何将此模型设置为“增量”。
有什么想法吗?
PS。我 运行 正在 SQL 服务器上。
如果您有日期字段,您可以使用它来仅加载最后的数据。 IE。你有 transaction_date 列。
{{ config(
materialized='incremental',
as_columnstore=false,
pre_hook="""
{% if is_incremental() %}
delete from {{this}}
where transaction_date >= '{{ (modules.datetime.datetime.now() - modules.datetime.timedelta(2)).isoformat() }}'
{% endif %}
"""
)}}
SELECT
u.user_id, t.transaction_id
FROM users u
JOIN transactions t ON t.user_id = u.user_id
WHERE u.active = 1
{% if is_incremental() %}
and transaction_date >= '{{ (modules.datetime.datetime.now() - modules.datetime.timedelta(2)).isoformat() }}'
{% endif %}
第一次运行这个模型时,它会忽略“if is_incremental()”下的所有代码。所有即将到来的 运行 它将清理过去两天的交易并重新加载它们。
正如@anders-swanson 在他的评论中所写,如果 transaction_id
绝对是独一无二的,您可以将其设置为 unique_key
并将您的模型具体化为增量 table。
dbt's docs 解释如何做到这一点。使用您的示例,它可能是:
{{
config(
materialized='incremental',
unique_key='transaction_id'
)
}}
select
u.user_id, t.transaction_id
from users u
join transactions t ON t.user_id = u.user_id
where u.active = 1
如果 transaction_id
不是唯一的,但 transaction_id
||user_id
是唯一的,您可以尝试创建一个新列,将这些列连接到上游 dbt 模型中,然后分配给它作为 unique_key
:
{{
config(
materialized='incremental',
unique_key='pkey'
)
}}
select
u.user_id,
t.transaction_id,
u.user_id||t.transaction_id as pkey
from users u
join transactions t ON t.user_id = u.user_id
where u.active = 1
否则,您必须拉入一个列,该列要么是 a) 唯一的,要么 b) 具有可用于应用 is_incremental()
过滤器的有序质量(如@viacheslav-nefedov 写道).