使用具有多个条件的 Dataform 增量更新 BigQuery table

Incremental update of BigQuery table using Dataform with multiple conditions

我在 Dataform 中使用以下代码创建了一个 BigQuery table:

config {
  type: "table",
  bigquery: {
    partitionBy: "DATE(timestamp)",
    clusterBy: ["itemName",'StoreName']
  }
}

SELECT DATE(timestamp) as Dates, *
from `<mytable in BQ>`
where storeName like "%XXXX%"

${when(incremental(), `where timestamp > (select max(timestamp) from ${self()})`)}

效果很好。但是,我想安排每日更新做以下事情:

config {
  type: "incremental",
  bigquery: {
    partitionBy: "DATE(timestamp)",
    clusterBy: ["itemName",'StoreName']
  }
}

SELECT DATE(timestamp) as Dates, *
from `<mytable in BQ>`
where storeName like "%XXXX%"

${when(incremental(), `where timestamp > (select max(timestamp) from ${self()})`)}

但这不起作用,因为我收到以下错误:

因此,从错误消息中我意识到我所做的实际上创建了 2 个 where 语句。因此,我尝试在带有 AND 的增量子句中使用过滤器,但这也不起作用。我找到了 tables 的增量更新,称为 updatePartitionFilter ,但我不确定它是否可以用于此。

config {
  type: "incremental",
  bigquery: {
    partitionBy: "DATE(timestamp)",
    updatePartitionFilter:
        "storeName like '%XXXX%'",
    clusterBy: ["itemName",'StoreName']
  }
}

SELECT DATE(timestamp) as Dates, *
from `<mytable in BQ>`
${when(incremental(), `where timestamp > (select max(timestamp) from ${self()})`)}

这样做正确吗?由于我每天都在更新,所以我无法在明天之前对此进行测试。我没时间等了。

我设法解决了这个问题。

updatePartitionFilter:
        "storeName like '%XXXX%'",

不是执行此操作的方法,因为它会影响分区,而不是选择的内容。

实际需要做的是在查询末尾移动 incremental 语句中的第二个子句:

config {
  type: "incremental",
  bigquery: {
    partitionBy: "DATE(timestamp)",
    clusterBy: ["itemName",'StoreName']
  }
}

SELECT DATE(timestamp) as Dates, *
from `<mytable in BQ>`
${when(incremental(), `where timestamp > (select max(timestamp) from ${self()}) and storeName like '%XXXX%'`)}

事实上,需要在此处插入任何其他过滤器。

SELECT DATE(timestamp) as Dates, *
from `<mytable in BQ>`
${when(incremental(), `where timestamp > (select max(timestamp) from ${self()}) 
and storeName like '%XXXX%' 
and itemName = 'My little pony'`)}

为了建立在 Serge 的答案之上,${when(incremental(), 'WHERE ..,')} 块只是将指定的 WHERE 子句附加到已执行的 SQL。可在 here.

中找到解释如何编译执行的 SQL 的有用视频

在您的初始示例中,您已经指定了一个 WHERE 子句,因此这是添加第二个 WHERE 子句,导致执行的查询中意外找到关键字的错误WHERE.

Serge 提出的解决方案有效,因为这将添加 WHERE 子句,然后添加带有 AND 的额外条件,这(当递增 table 时)将编译以下:

INSERT INTO `<target_table>`
SELECT DATE(timestamp) AS Dates, *
FROM `<mytable in BQ>`
WHERE timestamp > (select max(timestamp) from `<target_table>`
AND storeName like "%XXXX%"

但是我相信这将无法在 <target_table> 的初始构建中包含 storeName 上的过滤器,因为当 table 尚不存在时,${when(incremental(),'WHERE ...')} 子句未附加。这将使编译后的 SQL 如下所示:

CREATE OR REPLACE TABLE `<target_table>` AS
SELECT DATE(timestamp) AS Dates, *
FROM `<mytable in BQ>`
-- NOTE NO `WHERE` CLAUSE

更好的方法是将 ${when(incremental(),...)} 逻辑中的 WHERE 更改为 AND,如下所示:

config {
  type: "incremental",
  bigquery: {
    partitionBy: "DATE(timestamp)",
    clusterBy: ["itemName",'StoreName']
  }
}

SELECT DATE(timestamp) as Dates, *
from `<mytable in BQ>`
where storeName like "%XXXX%"

${when(incremental(), `AND timestamp > (select max(timestamp) from ${self()})`)}

这将在最初创建 table 时使用 storeName 上的过滤器成功执行查询,然后对于任何后续运行将执行以下编译的 SQL 以增加:

INSERT INTO `<target_table>`
SELECT DATE(timestamp) AS Dates, *
FROM `<mytable in BQ>`
WHERE storeName like "%XXXX%"
AND timestamp > (select max(timestamp) from `<target_table>`

希望这是有道理的! (这是我对 SO 的第一次回应,对于任何不可靠的格式,我们深表歉意...)

编辑:修改了第二个代码块示例,因为我没有意识到 Serge 在 ${when(incremental(),...)} 配置中包含 storeName 上的过滤器。