具有结构字段的 dbt 中的增量模型

incremental model in dbt with struct fields

我正在为一个可能的用例评估 DBT,除了一种情况外,一切似乎都很好。这是当源 table 具有结构字段时。

我正在使用 Spark Thrift Server connector,基础数据在 S3 中存储为 parquet。 DBT 版本为 0.20

这是源代码的一部分table create sentence,你可以看到里面有struct fields。

CREATE TABLE `<someSchema>`.`<sourceTable>` (
  `properties` STRUCT<`site`: STRING>,
  `channel` STRING,
  `timestamp` STRING,
  `anotherDate` STRING,
  `aDate ` STRING)
  USING parquet
  PARTITIONED BY (aDate)
  LOCATION 's3a://<someBucket>'

我的模型只是在 table 上使用某些 where 子句执行 select。第一次是 运行,它工作得很好,它创建了一个 table 和原来的完全一样,只是做了一些小的改动,正如预期的那样,即使有结构字段。

这里是一块水槽table创建table

CREATE TABLE `<someSchema>`.`dbtsink` (
      `properties` STRUCT<`site`: STRING>,
      `channel` STRING,
      `timestamp` STRING,
      `anotherDate ` STRING,
      `aDate` STRING)
USING parquet
PARTITIONED BY (anotherDate)

当我 运行 dbt 再次使用 where 子句中的一些其他值时,我的问题就来了,它应该在接收器 table 中创建另一个分区。查询编译就好了

它引发了这个错误:

Runtime Error in model dbtsink (models/anotherDate/dbtsink.sql)
  Database Error
    Error running query: org.apache.spark.sql.AnalysisException: cannot resolve '`site`' given input columns: [dbtsink__dbt_tmp.channel, dbtsink__dbt_tmp.anotherDate, dbtsink__dbt_tmp.aDate, dbtsink__dbt_tmp.properties, dbtsink__dbt_tmp.timestamp]; line 4 pos 25;
    'InsertIntoStatement 'UnresolvedRelation [someSchema, dbtsink], false, false
    +- 'Project [properties#6526, 'site, channel#6527, timestamp#6528, aDate#6541, anotherDate#6540]
       +- SubqueryAlias dbtsink__dbt_tmp
          +- Project [properties#6526, channel#6527, timestamp#6528, anotherDate#6540, aDate#6541]
             +- Filter (((aDate#6541 > 2021060100) AND (aDate#6541 <= 2021070609)) AND (anotherDate#6540 = 2021070609))
                +- SubqueryAlias spark_catalog.someSchema.sourceTable
                   +- Relation[context#6524,traits#6525,properties#6526,channel#6527,timestamp#6528,projectId#6529,integrations#6530,messageId#6531,originalTimestamp#6532,receivedAt#6533,sentAt#6534,userId#6535,anonymousId#6536,type#6537,providerId#6538,version#6539,anotherDate#6540,aDate#6541] parquet

它似乎在尝试读取或写入结构的内部字段作为根字段。我用其他结构字段进行了测试,结果与第一次执行时一样,我只想要结构本身。正如我所说,它只发生在第二次执行中。

这是我模型的查询,很简单

select 
properties,
channel,
timestamp,
anotherDate,
aDate
from {{ source('someSchema', 'sourceTable') }}
where aDate > '{{ var("aDateLowerLimit") }}' and aDate <= '{{ var("aDateUpperLimit") }}'
and anotherDate = '{{ var("anotherDate") }}'

如果我更改 select 以使用 to_json(properties) 将属性从结构转换为 json,它会按预期工作,生成一个新分区。

如果 DBT 中的结构有问题?我做错了什么?

我正在使用增量实现,我用 append 和 n insert_overwrite 测试了它,这似乎不是问题所在

问题与 DBT 如何尝试使用 REGEX. See the parse_columns_from_information 函数解析 Spark table 中的列有关。

您不能使用 REGEX 来解析 table 架构。该函数正在使用此 Spark SQL 语句提供的结果:show table extended in someSchema like '*'。使用该语句时,您的 table 架构会得到类似这样的内容:

Schema: root
 |-- properties: struct (nullable = true)
 |    |-- site: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- anotherDate: string (nullable = true)
 |-- aDate: string (nullable = true)

如您所见,将 REGEX 应用于上述字符串会弄乱您的列。

您可以使用 parse_describe_extended 解决此问题。此函数使用此 Spark SQL 语句提供的结果:describe extended someSchema.dbtsink。为了使用 parse_describe_extended 您需要禁用 DBT 缓存(这可能有害)要禁用 DBT 缓存,您可以使用此 dbt 参数:--bypass-cache.