如何使用 DBT 在 BigQuery 中对表进行分区

How do you partition tables in BigQuery using DBT

我是 DBT 的新手,之前一直在使用 Airflow 进行数据转换。

在 Airflow 中有一个名为 {{ ds }} 的变量,它代表这种形式的逻辑日期 YYYY-MM-DD{{ ds_nodash }} 代表这种形式的逻辑日期 YYYYMMDD。然后我可以设置一个类似这样的任务:

my_task = BigQueryOperator(
  task_id='t_my_task',
  sql= """ SELECT * FROM my_table where my_date="{{ ds }}" """,
  destination_dataset_table='my_project.my_dataset.my_table_new${{ ds_nodash }}',
  write_disposition='WRITE_TRUNCATE',
  dag=dag
)

这意味着我是 运行 第三行给出的 SQL 查询,这将覆盖第四行的 table。在 Airflow 界面中,如果我重新运行只说“2022-01-11”这一天,那么它会在该日期自动覆盖该分区。

我正在尝试找出如何在 DBT 中执行相同的操作。

使用 DBT,您可以使用 incremental model.

在 dbt 中,您使用 SQL 语句描述您想要的数据,您选择的 实现 决定了数据在您的仓库中的实现方式:

  • 短暂实现:CTE 将是 short-lived,
  • table 物化:一个 BQ table,但是每次你 re-run 你的 DBT 项目它都会被删除并且 re-created(不是 suitable 大tables)
  • 视图具体化:视图...
  • 和增量模型

对于增量模型,基本上 table 您想要插入新行的地方很大。此具体化允许您添加规则,例如“从 table source_table 插入行,其中时间戳 >= 今天”。

在您的情况下,使用 DBT + BigQuery,您有 2 个选项:mergeinsert+ overwrite,但根据您的描述,您会希望使用后者。

您需要在模型的开头包含此内容:

{{
  config(
    materialized='incremental',
    incremental_strategy='insert_overwrite',
    ...
  )
}}

作为参考,您可以前往 there and there