利用宏在 bigquery 的多个模式上不再在 dbt 中删除旧关系

Utilizing macro to drop old relations no longer in dbt on multiple schemas on bigquery

来源和灵感在这里: Clean your warehouse of old and deprecated models

正在尝试转换以下 dbt 宏来做两件事:

  1. 运行 关于 bigquery
  2. 从我的多个目标模式中删除“旧”关系

我的模式查询输出如下:

SELECT schema_name FROM `my-project.region-us.INFORMATION_SCHEMA.SCHEMATA` order by schema_name desc;

schema_name
dbt_dev
dbt_dev_stage
dbt_dev_mart
dbt_dev_analytics
dbt_prod
dbt_prod_stage
dbt_prod_mart
dbt_prod_analytics
etc...

我的“调整后”宏类似于:

{% macro drop_old_relations(dryrun=False) %}

{% if execute %}
  {% set current_models=[] %}

  {% for node in graph.nodes.values()
     | selectattr("resource_type", "in", ["model", "seed", "snapshot"])%}
    {% do current_models.append(node.name) %}

  {% endfor %}
{% endif %}

{% set cleanup_query %}

      WITH MODELS_TO_DROP AS (
          SELECT
            CASE 
              WHEN TABLE_TYPE = 'BASE TABLE' THEN 'TABLE'
              WHEN TABLE_TYPE = 'VIEW' THEN 'VIEW'
            END AS RELATION_TYPE,
            CONCAT( TABLE_CATALOG,".",{{ target.schema }},".", TABLE_NAME) AS RELATION_NAME
          FROM 
            {{ target.database }}.{{ target.schema }}.INFORMATION_SCHEMA.TABLES
          WHERE TABLE_SCHEMA = {{ target.schema }}
            AND TABLE_NAME NOT IN
              ({%- for model in current_models -%}
                  '{{ model.upper() }}'
                  {%- if not loop.last -%}
                      ,
                  {% endif %}
              {%- endfor -%})) 
      SELECT 
        'DROP ' || RELATION_TYPE || ' ' || RELATION_NAME || ';' as DROP_COMMANDS
      FROM 
        MODELS_TO_DROP
  {% endset %}

{% do log(cleanup_query, info=True) %}
{% set drop_commands = run_query(cleanup_query).columns[0].values() %}

{% if drop_commands %}
  {% if dryrun | as_bool == False %}
    {% do log('Executing DROP commands...', True) %}
  {% else %}
    {% do log('Printing DROP commands...', True) %}
  {% endif %}
  {% for drop_command in drop_commands %}
    {% do log(drop_command, True) %}
    {% if dryrun | as_bool == False %}
      {% do run_query(drop_command) %}
    {% endif %}
  {% endfor %}
{% else %}
  {% do log('No relations to clean.', True) %}
{% endif %}

{%- endmacro -%}

我目前运行遇到宏无法识别我的一些目标模式的问题:

dbt run-operation drop_old_relations --args "{dryrun: True}"

Encountered an error while running operation: Database Error
  Unrecognized name: dbt_dev at [14:32]

或者我很乐意走模式参数之类的路线,然后将模式作为 运行-hook 进行迭代,例如:

dbt run-operation drop_old_relations --args "{schema: dbt_dev_mart, dryrun: True}"

on-run-start:
 - "{% for schema in schemas%}drop_old_relations({{ schema }},False);{% endfor%}"

问得好,我喜欢看到人们调整这种不同的仓库。我将根据你的问题将我的回答分为两部分。

1.让它在 BigQuery 上运行 我能够首先在我的目标架构上运行它。看起来我们只需要调整 cleanup_query.

中的一些 SQL

drop_old_relations.sql

{% macro drop_old_relations(schema=target.schema, dryrun=False) %}

{# Get the models that currently exist in dbt #}
{% if execute %}
  {% set current_models=[] %}

  {% for node in graph.nodes.values()
     | selectattr("resource_type", "in", ["model", "seed", "snapshot"])%}
        {% do current_models.append(node.name) %}
    
  {% endfor %}
{% endif %}

{# Run a query to create the drop statements for all relations in BQ that are NOT in the dbt project #}
{% set cleanup_query %}

      WITH MODELS_TO_DROP AS (
          SELECT
            CASE 
              WHEN TABLE_TYPE = 'BASE TABLE' THEN 'TABLE'
              WHEN TABLE_TYPE = 'VIEW' THEN 'VIEW'
            END AS RELATION_TYPE,
            CONCAT('{{ schema }}','.',TABLE_NAME) AS RELATION_NAME
          FROM 
            {{ schema }}.INFORMATION_SCHEMA.TABLES
          WHERE TABLE_SCHEMA = '{{ schema }}'
            AND UPPER(TABLE_NAME) NOT IN
              ({%- for model in current_models -%}
                  '{{ model.upper() }}'
                  {%- if not loop.last -%}
                      ,
                  {% endif %}
              {%- endfor -%})) 
      SELECT 
        'DROP ' || RELATION_TYPE || ' ' || RELATION_NAME || ';' as DROP_COMMANDS
      FROM 
        MODELS_TO_DROP
  {% endset %}

{% set drop_commands = run_query(cleanup_query).columns[0].values() %}

{# Execute each of the drop commands for each relation #}

{% if drop_commands %}
  {% if dryrun | as_bool == False %}
    {% do log('Executing DROP commands...', True) %}
  {% else %}
    {% do log('Printing DROP commands...', True) %}
  {% endif %}
  {% for drop_command in drop_commands %}
    {% do log(drop_command, True) %}
    {% if dryrun | as_bool == False %}
      {% do run_query(drop_command) %}
    {% endif %}
  {% endfor %}
{% else %}
  {% do log('No relations to clean.', True) %}
{% endif %}

{%- endmacro -%}

2。清理多个模式 与其调整上面的宏来清除所有模式,我认为使用另一个宏来获取所有模式然后遍历每个模式并调用上面的宏可能会有所帮助。

clean_all_schemas.sql

{% macro clean_all_schemas() %}

  {% set get_schemas_query %}
      SELECT schema_name FROM `{{ target.project }}.region-{{ target.location }}.INFORMATION_SCHEMA.SCHEMATA` order by schema_name desc;
  {% endset %}

  {% set schemas = run_query(get_schemas_query).columns[0].values() %}

  {% for schema in schemas %}
    {% do log("Cleaning up " + schema + " schema", True) %}
    {{ drop_old_relations(schema=schema) }}
  {% endfor %}

{% endmacro %}%}

命令

  • dbt run-operation drop_old_relations - 删除目标模式中的旧关系
  • dbt run-operation clean_all_schemas - 删除目标 BQ 项目中跨模式的旧关系。

让我们知道这是否适用于您的用例:-)