从外部源使用 dbt 查询

Querying with dbt from external source

我有以下问题:

我已经通过创建存储集成设法做到了这一点,并且我已经使用我的角色(用于 运行ning dbt)手动创建了一个模式并评估了该模式的使用情况。到目前为止一切顺利。

然后我读到这个:

https://github.com/fishtown-analytics/dbt-external-tables

问题是这是 运行 正确的唯一方法,我不得不更改我的 dbt profiles.yml,将默认模式设置为 S3_MIXPANEL 和默认数据库 RAW_DEV, 运行 一个不同的目标和作用于 --target 'ingest_dev' 参数。

我一直认为应该有一个更复杂的解决方案,我可以在其中创建模式和查询元数据并使用类似 {{ source() }} 的东西,这样我就可以以某种方式指出我的文档这是一个外部源。我认为这个 dbt-external-tables 对于我这里的案例没有很好的解释?

任何人都可以帮助我并分享如何在不更改默认模式宏和 dbtprofiles.yml 的情况下正确地从外部阶段创建模式和查询吗?

我已经成功运行以下代码:

{{
  config(
    materialized ='incremental',
    schema = generate_schema_name('S3_MIXPANEL')
  )
}}
 
  SELECT
    metadata$filename as file_name,
    to_date(SUBSTR(metadata$filename,16,10),'yyyy/mm/dd') as event_date,
     as payload,
    CONVERT_TIMEZONE('Europe/London',TO_TIMESTAMP_tz(:properties:mp_processing_time_ms::int / 1000)) as  event_timestamp_converted,
    CONVERT_TIMEZONE('Europe/London', current_timestamp) as ingested_at

 from

    @my_s3_stage

    
{% if is_incremental() %}
    -- this filter will only be applied on an incremental run
    WHERE event_date>(
    SELECT
        max(event_date)
    FROM
        {{ this }}
    )
{% endif %}

{{ row_limit() }} 

编辑 22-06-20:

我已经在我的模型中添加了 src_mixpanel.yml 文件和 运行 dbt 命令,但是我还必须指定 data_types,所以我也添加了它们,然后我显然也必须在我的宏中添加“宏”(顺便说一句,这可能是一个愚蠢的问题,但我真的不知道如何安装你的包,所以我手动将你的所有宏添加到我的宏中)。

现在当我运行这个代码时:

dbt run-operation stage_external_sources

version: 2

sources:

  - name: s3_mixpanel
    database: RAW_DEV
    tables:
      - name: events
        external:
          location: '@my_s3_stage'
          auto_refresh: false # depends on your S3 setup
          partitions:
            - name: event_date
              expression: to_date(SUBSTR(metadata$filename,16,10),'yyyy/mm/dd')
              data_type: date
            - name: file_name
              expression: metadata$filename
              data_type: string
          columns:
            - name: properties
              data_type: variant

我得到一个错误:

Encountered an error while running operation: Compilation Error in macro stage_external_sources (macros/stage_external_sources.sql)
'dict object' has no attribute 'sources'

作为 dbt-external-tables 包的维护者,我将分享它的观点。该包认为您应该将所有外部源(S3 文件)作为外部 table 或使用 snowpipes first,在一个包含尽可能少的混淆逻辑的过程中。然后,您可以 select 从他们那里,作为 dbt 模型中的来源,以及所有必要的业务逻辑。

如果我的理解是正确的,您将在名为(例如)models/staging/mixpanel/src_mixpanel.yml:

的文件中暂存混合面板数据,如下所示
version: 2

sources:

  - name: s3_mixpanel
    database: raw_dev
    tables:
      - name: events
        external:
          location: '@my_s3_stage'
          file_format: "( type = json )"  # or a named file format
          auto_refresh: false # depends on your S3 setup
          partitions:
            - name: event_date
              expression: to_date(SUBSTR(metadata$filename,16,10),'yyyy/mm/dd')
          columns:
            - name: properties
              data_type: variant

您可以 运行 包中的这个宏来创建外部 table — 并且在创建之后,如果您没有启用 auto_refresh 则更新它的分区元数据(见雪花 docs):

dbt run-operation stage_external_sources

然后您可以 select 以增量模型的形式从此源中获取数据,就像上面的模型一样。现在,event_date 是此外部 table 上的分区列,因此对其进行过滤 应该 启用 Snowflake 以 p运行e 文件(尽管这已经对于动态的、子查询派生的过滤器,历史上不一致。

{{
  config(
    materialized ='incremental'
  )
}}
 
  SELECT
    metadata$filename as file_name,
    event_date,
    value as payload,
    properties:mp_processing_time_ms::int / 1000 as event_timestamp_converted,
    CONVERT_TIMEZONE('Europe/London', current_timestamp) as modeled_at

 from {{ source('s3_mixpanel', 'events' }} 

    
{% if is_incremental() %}
    -- this filter will only be applied on an incremental run
    WHERE event_date >(
    SELECT
        max(event_date)
    FROM
        {{ this }}
    )
{% endif %}

{{ row_limit() }}