从外部源使用 dbt 查询
Querying with dbt from external source
我有以下问题:
- 我有一个 AWS S3 管道,每天都会吐出一个 json.gz 个文件。
- 我希望用 dbt 获取该文件并将其放入 snowflake(没有 snowpipe 使用 atm)
我已经通过创建存储集成设法做到了这一点,并且我已经使用我的角色(用于 运行ning dbt)手动创建了一个模式并评估了该模式的使用情况。到目前为止一切顺利。
然后我读到这个:
https://github.com/fishtown-analytics/dbt-external-tables
问题是这是 运行 正确的唯一方法,我不得不更改我的 dbt profiles.yml,将默认模式设置为 S3_MIXPANEL 和默认数据库 RAW_DEV, 运行 一个不同的目标和作用于 --target 'ingest_dev' 参数。
我一直认为应该有一个更复杂的解决方案,我可以在其中创建模式和查询元数据并使用类似 {{ source() }} 的东西,这样我就可以以某种方式指出我的文档这是一个外部源。我认为这个 dbt-external-tables 对于我这里的案例没有很好的解释?
任何人都可以帮助我并分享如何在不更改默认模式宏和 dbtprofiles.yml 的情况下正确地从外部阶段创建模式和查询吗?
我已经成功运行以下代码:
{{
config(
materialized ='incremental',
schema = generate_schema_name('S3_MIXPANEL')
)
}}
SELECT
metadata$filename as file_name,
to_date(SUBSTR(metadata$filename,16,10),'yyyy/mm/dd') as event_date,
as payload,
CONVERT_TIMEZONE('Europe/London',TO_TIMESTAMP_tz(:properties:mp_processing_time_ms::int / 1000)) as event_timestamp_converted,
CONVERT_TIMEZONE('Europe/London', current_timestamp) as ingested_at
from
@my_s3_stage
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
WHERE event_date>(
SELECT
max(event_date)
FROM
{{ this }}
)
{% endif %}
{{ row_limit() }}
编辑 22-06-20:
我已经在我的模型中添加了 src_mixpanel.yml 文件和 运行 dbt 命令,但是我还必须指定 data_types,所以我也添加了它们,然后我显然也必须在我的宏中添加“宏”(顺便说一句,这可能是一个愚蠢的问题,但我真的不知道如何安装你的包,所以我手动将你的所有宏添加到我的宏中)。
现在当我运行这个代码时:
dbt run-operation stage_external_sources
和
version: 2
sources:
- name: s3_mixpanel
database: RAW_DEV
tables:
- name: events
external:
location: '@my_s3_stage'
auto_refresh: false # depends on your S3 setup
partitions:
- name: event_date
expression: to_date(SUBSTR(metadata$filename,16,10),'yyyy/mm/dd')
data_type: date
- name: file_name
expression: metadata$filename
data_type: string
columns:
- name: properties
data_type: variant
我得到一个错误:
Encountered an error while running operation: Compilation Error in macro stage_external_sources (macros/stage_external_sources.sql)
'dict object' has no attribute 'sources'
作为 dbt-external-tables
包的维护者,我将分享它的观点。该包认为您应该将所有外部源(S3 文件)作为外部 table 或使用 snowpipes first,在一个包含尽可能少的混淆逻辑的过程中。然后,您可以 select 从他们那里,作为 dbt 模型中的来源,以及所有必要的业务逻辑。
如果我的理解是正确的,您将在名为(例如)models/staging/mixpanel/src_mixpanel.yml:
的文件中暂存混合面板数据,如下所示
version: 2
sources:
- name: s3_mixpanel
database: raw_dev
tables:
- name: events
external:
location: '@my_s3_stage'
file_format: "( type = json )" # or a named file format
auto_refresh: false # depends on your S3 setup
partitions:
- name: event_date
expression: to_date(SUBSTR(metadata$filename,16,10),'yyyy/mm/dd')
columns:
- name: properties
data_type: variant
您可以 运行 包中的这个宏来创建外部 table — 并且在创建之后,如果您没有启用 auto_refresh
则更新它的分区元数据(见雪花 docs):
dbt run-operation stage_external_sources
然后您可以 select 以增量模型的形式从此源中获取数据,就像上面的模型一样。现在,event_date
是此外部 table 上的分区列,因此对其进行过滤 应该 启用 Snowflake 以 p运行e 文件(尽管这已经对于动态的、子查询派生的过滤器,历史上不一致。
{{
config(
materialized ='incremental'
)
}}
SELECT
metadata$filename as file_name,
event_date,
value as payload,
properties:mp_processing_time_ms::int / 1000 as event_timestamp_converted,
CONVERT_TIMEZONE('Europe/London', current_timestamp) as modeled_at
from {{ source('s3_mixpanel', 'events' }}
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
WHERE event_date >(
SELECT
max(event_date)
FROM
{{ this }}
)
{% endif %}
{{ row_limit() }}
我有以下问题:
- 我有一个 AWS S3 管道,每天都会吐出一个 json.gz 个文件。
- 我希望用 dbt 获取该文件并将其放入 snowflake(没有 snowpipe 使用 atm)
我已经通过创建存储集成设法做到了这一点,并且我已经使用我的角色(用于 运行ning dbt)手动创建了一个模式并评估了该模式的使用情况。到目前为止一切顺利。
然后我读到这个:
https://github.com/fishtown-analytics/dbt-external-tables
问题是这是 运行 正确的唯一方法,我不得不更改我的 dbt profiles.yml,将默认模式设置为 S3_MIXPANEL 和默认数据库 RAW_DEV, 运行 一个不同的目标和作用于 --target 'ingest_dev' 参数。
我一直认为应该有一个更复杂的解决方案,我可以在其中创建模式和查询元数据并使用类似 {{ source() }} 的东西,这样我就可以以某种方式指出我的文档这是一个外部源。我认为这个 dbt-external-tables 对于我这里的案例没有很好的解释?
任何人都可以帮助我并分享如何在不更改默认模式宏和 dbtprofiles.yml 的情况下正确地从外部阶段创建模式和查询吗?
我已经成功运行以下代码:
{{
config(
materialized ='incremental',
schema = generate_schema_name('S3_MIXPANEL')
)
}}
SELECT
metadata$filename as file_name,
to_date(SUBSTR(metadata$filename,16,10),'yyyy/mm/dd') as event_date,
as payload,
CONVERT_TIMEZONE('Europe/London',TO_TIMESTAMP_tz(:properties:mp_processing_time_ms::int / 1000)) as event_timestamp_converted,
CONVERT_TIMEZONE('Europe/London', current_timestamp) as ingested_at
from
@my_s3_stage
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
WHERE event_date>(
SELECT
max(event_date)
FROM
{{ this }}
)
{% endif %}
{{ row_limit() }}
编辑 22-06-20:
我已经在我的模型中添加了 src_mixpanel.yml 文件和 运行 dbt 命令,但是我还必须指定 data_types,所以我也添加了它们,然后我显然也必须在我的宏中添加“宏”(顺便说一句,这可能是一个愚蠢的问题,但我真的不知道如何安装你的包,所以我手动将你的所有宏添加到我的宏中)。
现在当我运行这个代码时:
dbt run-operation stage_external_sources
和
version: 2
sources:
- name: s3_mixpanel
database: RAW_DEV
tables:
- name: events
external:
location: '@my_s3_stage'
auto_refresh: false # depends on your S3 setup
partitions:
- name: event_date
expression: to_date(SUBSTR(metadata$filename,16,10),'yyyy/mm/dd')
data_type: date
- name: file_name
expression: metadata$filename
data_type: string
columns:
- name: properties
data_type: variant
我得到一个错误:
Encountered an error while running operation: Compilation Error in macro stage_external_sources (macros/stage_external_sources.sql)
'dict object' has no attribute 'sources'
作为 dbt-external-tables
包的维护者,我将分享它的观点。该包认为您应该将所有外部源(S3 文件)作为外部 table 或使用 snowpipes first,在一个包含尽可能少的混淆逻辑的过程中。然后,您可以 select 从他们那里,作为 dbt 模型中的来源,以及所有必要的业务逻辑。
如果我的理解是正确的,您将在名为(例如)models/staging/mixpanel/src_mixpanel.yml:
的文件中暂存混合面板数据,如下所示version: 2
sources:
- name: s3_mixpanel
database: raw_dev
tables:
- name: events
external:
location: '@my_s3_stage'
file_format: "( type = json )" # or a named file format
auto_refresh: false # depends on your S3 setup
partitions:
- name: event_date
expression: to_date(SUBSTR(metadata$filename,16,10),'yyyy/mm/dd')
columns:
- name: properties
data_type: variant
您可以 运行 包中的这个宏来创建外部 table — 并且在创建之后,如果您没有启用 auto_refresh
则更新它的分区元数据(见雪花 docs):
dbt run-operation stage_external_sources
然后您可以 select 以增量模型的形式从此源中获取数据,就像上面的模型一样。现在,event_date
是此外部 table 上的分区列,因此对其进行过滤 应该 启用 Snowflake 以 p运行e 文件(尽管这已经对于动态的、子查询派生的过滤器,历史上不一致。
{{
config(
materialized ='incremental'
)
}}
SELECT
metadata$filename as file_name,
event_date,
value as payload,
properties:mp_processing_time_ms::int / 1000 as event_timestamp_converted,
CONVERT_TIMEZONE('Europe/London', current_timestamp) as modeled_at
from {{ source('s3_mixpanel', 'events' }}
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
WHERE event_date >(
SELECT
max(event_date)
FROM
{{ this }}
)
{% endif %}
{{ row_limit() }}