具有结构字段的 dbt 中的增量模型
incremental model in dbt with struct fields
我正在为一个可能的用例评估 DBT,除了一种情况外,一切似乎都很好。这是当源 table 具有结构字段时。
我正在使用 Spark Thrift Server connector
,基础数据在 S3
中存储为 parquet
。 DBT 版本为 0.20
这是源代码的一部分table create sentence,你可以看到里面有struct fields。
CREATE TABLE `<someSchema>`.`<sourceTable>` (
`properties` STRUCT<`site`: STRING>,
`channel` STRING,
`timestamp` STRING,
`anotherDate` STRING,
`aDate ` STRING)
USING parquet
PARTITIONED BY (aDate)
LOCATION 's3a://<someBucket>'
我的模型只是在 table 上使用某些 where 子句执行 select。第一次是 运行,它工作得很好,它创建了一个 table 和原来的完全一样,只是做了一些小的改动,正如预期的那样,即使有结构字段。
这里是一块水槽table创建table
CREATE TABLE `<someSchema>`.`dbtsink` (
`properties` STRUCT<`site`: STRING>,
`channel` STRING,
`timestamp` STRING,
`anotherDate ` STRING,
`aDate` STRING)
USING parquet
PARTITIONED BY (anotherDate)
当我 运行 dbt 再次使用 where 子句中的一些其他值时,我的问题就来了,它应该在接收器 table 中创建另一个分区。查询编译就好了
它引发了这个错误:
Runtime Error in model dbtsink (models/anotherDate/dbtsink.sql)
Database Error
Error running query: org.apache.spark.sql.AnalysisException: cannot resolve '`site`' given input columns: [dbtsink__dbt_tmp.channel, dbtsink__dbt_tmp.anotherDate, dbtsink__dbt_tmp.aDate, dbtsink__dbt_tmp.properties, dbtsink__dbt_tmp.timestamp]; line 4 pos 25;
'InsertIntoStatement 'UnresolvedRelation [someSchema, dbtsink], false, false
+- 'Project [properties#6526, 'site, channel#6527, timestamp#6528, aDate#6541, anotherDate#6540]
+- SubqueryAlias dbtsink__dbt_tmp
+- Project [properties#6526, channel#6527, timestamp#6528, anotherDate#6540, aDate#6541]
+- Filter (((aDate#6541 > 2021060100) AND (aDate#6541 <= 2021070609)) AND (anotherDate#6540 = 2021070609))
+- SubqueryAlias spark_catalog.someSchema.sourceTable
+- Relation[context#6524,traits#6525,properties#6526,channel#6527,timestamp#6528,projectId#6529,integrations#6530,messageId#6531,originalTimestamp#6532,receivedAt#6533,sentAt#6534,userId#6535,anonymousId#6536,type#6537,providerId#6538,version#6539,anotherDate#6540,aDate#6541] parquet
它似乎在尝试读取或写入结构的内部字段作为根字段。我用其他结构字段进行了测试,结果与第一次执行时一样,我只想要结构本身。正如我所说,它只发生在第二次执行中。
这是我模型的查询,很简单
select
properties,
channel,
timestamp,
anotherDate,
aDate
from {{ source('someSchema', 'sourceTable') }}
where aDate > '{{ var("aDateLowerLimit") }}' and aDate <= '{{ var("aDateUpperLimit") }}'
and anotherDate = '{{ var("anotherDate") }}'
如果我更改 select 以使用 to_json(properties)
将属性从结构转换为 json,它会按预期工作,生成一个新分区。
如果 DBT 中的结构有问题?我做错了什么?
我正在使用增量实现,我用 append 和 n insert_overwrite 测试了它,这似乎不是问题所在
问题与 DBT 如何尝试使用 REGEX. See the parse_columns_from_information 函数解析 Spark table 中的列有关。
您不能使用 REGEX 来解析 table 架构。该函数正在使用此 Spark SQL 语句提供的结果:show table extended in someSchema like '*'
。使用该语句时,您的 table 架构会得到类似这样的内容:
Schema: root
|-- properties: struct (nullable = true)
| |-- site: string (nullable = true)
|-- channel: string (nullable = true)
|-- timestamp: string (nullable = true)
|-- anotherDate: string (nullable = true)
|-- aDate: string (nullable = true)
如您所见,将 REGEX 应用于上述字符串会弄乱您的列。
您可以使用 parse_describe_extended 解决此问题。此函数使用此 Spark SQL 语句提供的结果:describe extended someSchema.dbtsink
。为了使用
parse_describe_extended
您需要禁用 DBT 缓存(这可能有害)要禁用 DBT 缓存,您可以使用此 dbt 参数:--bypass-cache
.
我正在为一个可能的用例评估 DBT,除了一种情况外,一切似乎都很好。这是当源 table 具有结构字段时。
我正在使用 Spark Thrift Server connector
,基础数据在 S3
中存储为 parquet
。 DBT 版本为 0.20
这是源代码的一部分table create sentence,你可以看到里面有struct fields。
CREATE TABLE `<someSchema>`.`<sourceTable>` (
`properties` STRUCT<`site`: STRING>,
`channel` STRING,
`timestamp` STRING,
`anotherDate` STRING,
`aDate ` STRING)
USING parquet
PARTITIONED BY (aDate)
LOCATION 's3a://<someBucket>'
我的模型只是在 table 上使用某些 where 子句执行 select。第一次是 运行,它工作得很好,它创建了一个 table 和原来的完全一样,只是做了一些小的改动,正如预期的那样,即使有结构字段。
这里是一块水槽table创建table
CREATE TABLE `<someSchema>`.`dbtsink` (
`properties` STRUCT<`site`: STRING>,
`channel` STRING,
`timestamp` STRING,
`anotherDate ` STRING,
`aDate` STRING)
USING parquet
PARTITIONED BY (anotherDate)
当我 运行 dbt 再次使用 where 子句中的一些其他值时,我的问题就来了,它应该在接收器 table 中创建另一个分区。查询编译就好了
它引发了这个错误:
Runtime Error in model dbtsink (models/anotherDate/dbtsink.sql)
Database Error
Error running query: org.apache.spark.sql.AnalysisException: cannot resolve '`site`' given input columns: [dbtsink__dbt_tmp.channel, dbtsink__dbt_tmp.anotherDate, dbtsink__dbt_tmp.aDate, dbtsink__dbt_tmp.properties, dbtsink__dbt_tmp.timestamp]; line 4 pos 25;
'InsertIntoStatement 'UnresolvedRelation [someSchema, dbtsink], false, false
+- 'Project [properties#6526, 'site, channel#6527, timestamp#6528, aDate#6541, anotherDate#6540]
+- SubqueryAlias dbtsink__dbt_tmp
+- Project [properties#6526, channel#6527, timestamp#6528, anotherDate#6540, aDate#6541]
+- Filter (((aDate#6541 > 2021060100) AND (aDate#6541 <= 2021070609)) AND (anotherDate#6540 = 2021070609))
+- SubqueryAlias spark_catalog.someSchema.sourceTable
+- Relation[context#6524,traits#6525,properties#6526,channel#6527,timestamp#6528,projectId#6529,integrations#6530,messageId#6531,originalTimestamp#6532,receivedAt#6533,sentAt#6534,userId#6535,anonymousId#6536,type#6537,providerId#6538,version#6539,anotherDate#6540,aDate#6541] parquet
它似乎在尝试读取或写入结构的内部字段作为根字段。我用其他结构字段进行了测试,结果与第一次执行时一样,我只想要结构本身。正如我所说,它只发生在第二次执行中。
这是我模型的查询,很简单
select
properties,
channel,
timestamp,
anotherDate,
aDate
from {{ source('someSchema', 'sourceTable') }}
where aDate > '{{ var("aDateLowerLimit") }}' and aDate <= '{{ var("aDateUpperLimit") }}'
and anotherDate = '{{ var("anotherDate") }}'
如果我更改 select 以使用 to_json(properties)
将属性从结构转换为 json,它会按预期工作,生成一个新分区。
如果 DBT 中的结构有问题?我做错了什么?
我正在使用增量实现,我用 append 和 n insert_overwrite 测试了它,这似乎不是问题所在
问题与 DBT 如何尝试使用 REGEX. See the parse_columns_from_information 函数解析 Spark table 中的列有关。
您不能使用 REGEX 来解析 table 架构。该函数正在使用此 Spark SQL 语句提供的结果:show table extended in someSchema like '*'
。使用该语句时,您的 table 架构会得到类似这样的内容:
Schema: root
|-- properties: struct (nullable = true)
| |-- site: string (nullable = true)
|-- channel: string (nullable = true)
|-- timestamp: string (nullable = true)
|-- anotherDate: string (nullable = true)
|-- aDate: string (nullable = true)
如您所见,将 REGEX 应用于上述字符串会弄乱您的列。
您可以使用 parse_describe_extended 解决此问题。此函数使用此 Spark SQL 语句提供的结果:describe extended someSchema.dbtsink
。为了使用
parse_describe_extended
您需要禁用 DBT 缓存(这可能有害)要禁用 DBT 缓存,您可以使用此 dbt 参数:--bypass-cache
.