使用具有多个条件的 Dataform 增量更新 BigQuery table
Incremental update of BigQuery table using Dataform with multiple conditions
我在 Dataform 中使用以下代码创建了一个 BigQuery table:
config {
type: "table",
bigquery: {
partitionBy: "DATE(timestamp)",
clusterBy: ["itemName",'StoreName']
}
}
SELECT DATE(timestamp) as Dates, *
from `<mytable in BQ>`
where storeName like "%XXXX%"
${when(incremental(), `where timestamp > (select max(timestamp) from ${self()})`)}
效果很好。但是,我想安排每日更新做以下事情:
config {
type: "incremental",
bigquery: {
partitionBy: "DATE(timestamp)",
clusterBy: ["itemName",'StoreName']
}
}
SELECT DATE(timestamp) as Dates, *
from `<mytable in BQ>`
where storeName like "%XXXX%"
${when(incremental(), `where timestamp > (select max(timestamp) from ${self()})`)}
但这不起作用,因为我收到以下错误:
因此,从错误消息中我意识到我所做的实际上创建了 2 个 where 语句。因此,我尝试在带有 AND 的增量子句中使用过滤器,但这也不起作用。我找到了 tables 的增量更新,称为 updatePartitionFilter
,但我不确定它是否可以用于此。
config {
type: "incremental",
bigquery: {
partitionBy: "DATE(timestamp)",
updatePartitionFilter:
"storeName like '%XXXX%'",
clusterBy: ["itemName",'StoreName']
}
}
SELECT DATE(timestamp) as Dates, *
from `<mytable in BQ>`
${when(incremental(), `where timestamp > (select max(timestamp) from ${self()})`)}
这样做正确吗?由于我每天都在更新,所以我无法在明天之前对此进行测试。我没时间等了。
我设法解决了这个问题。
updatePartitionFilter:
"storeName like '%XXXX%'",
不是执行此操作的方法,因为它会影响分区,而不是选择的内容。
实际需要做的是在查询末尾移动 incremental
语句中的第二个子句:
config {
type: "incremental",
bigquery: {
partitionBy: "DATE(timestamp)",
clusterBy: ["itemName",'StoreName']
}
}
SELECT DATE(timestamp) as Dates, *
from `<mytable in BQ>`
${when(incremental(), `where timestamp > (select max(timestamp) from ${self()}) and storeName like '%XXXX%'`)}
事实上,需要在此处插入任何其他过滤器。
SELECT DATE(timestamp) as Dates, *
from `<mytable in BQ>`
${when(incremental(), `where timestamp > (select max(timestamp) from ${self()})
and storeName like '%XXXX%'
and itemName = 'My little pony'`)}
为了建立在 Serge 的答案之上,${when(incremental(), 'WHERE ..,')}
块只是将指定的 WHERE
子句附加到已执行的 SQL。可在 here.
中找到解释如何编译执行的 SQL 的有用视频
在您的初始示例中,您已经指定了一个 WHERE
子句,因此这是添加第二个 WHERE
子句,导致执行的查询中意外找到关键字的错误WHERE
.
Serge 提出的解决方案有效,因为这将添加 WHERE
子句,然后添加带有 AND
的额外条件,这(当递增 table 时)将编译以下:
INSERT INTO `<target_table>`
SELECT DATE(timestamp) AS Dates, *
FROM `<mytable in BQ>`
WHERE timestamp > (select max(timestamp) from `<target_table>`
AND storeName like "%XXXX%"
但是我相信这将无法在 <target_table>
的初始构建中包含 storeName
上的过滤器,因为当 table 尚不存在时,${when(incremental(),'WHERE ...')}
子句未附加。这将使编译后的 SQL 如下所示:
CREATE OR REPLACE TABLE `<target_table>` AS
SELECT DATE(timestamp) AS Dates, *
FROM `<mytable in BQ>`
-- NOTE NO `WHERE` CLAUSE
更好的方法是将 ${when(incremental(),...)}
逻辑中的 WHERE
更改为 AND
,如下所示:
config {
type: "incremental",
bigquery: {
partitionBy: "DATE(timestamp)",
clusterBy: ["itemName",'StoreName']
}
}
SELECT DATE(timestamp) as Dates, *
from `<mytable in BQ>`
where storeName like "%XXXX%"
${when(incremental(), `AND timestamp > (select max(timestamp) from ${self()})`)}
这将在最初创建 table 时使用 storeName
上的过滤器成功执行查询,然后对于任何后续运行将执行以下编译的 SQL 以增加:
INSERT INTO `<target_table>`
SELECT DATE(timestamp) AS Dates, *
FROM `<mytable in BQ>`
WHERE storeName like "%XXXX%"
AND timestamp > (select max(timestamp) from `<target_table>`
希望这是有道理的! (这是我对 SO 的第一次回应,对于任何不可靠的格式,我们深表歉意...)
编辑:修改了第二个代码块示例,因为我没有意识到 Serge 在 ${when(incremental(),...)}
配置中包含 storeName
上的过滤器。
我在 Dataform 中使用以下代码创建了一个 BigQuery table:
config {
type: "table",
bigquery: {
partitionBy: "DATE(timestamp)",
clusterBy: ["itemName",'StoreName']
}
}
SELECT DATE(timestamp) as Dates, *
from `<mytable in BQ>`
where storeName like "%XXXX%"
${when(incremental(), `where timestamp > (select max(timestamp) from ${self()})`)}
效果很好。但是,我想安排每日更新做以下事情:
config {
type: "incremental",
bigquery: {
partitionBy: "DATE(timestamp)",
clusterBy: ["itemName",'StoreName']
}
}
SELECT DATE(timestamp) as Dates, *
from `<mytable in BQ>`
where storeName like "%XXXX%"
${when(incremental(), `where timestamp > (select max(timestamp) from ${self()})`)}
但这不起作用,因为我收到以下错误:
因此,从错误消息中我意识到我所做的实际上创建了 2 个 where 语句。因此,我尝试在带有 AND 的增量子句中使用过滤器,但这也不起作用。我找到了 tables 的增量更新,称为 updatePartitionFilter
,但我不确定它是否可以用于此。
config {
type: "incremental",
bigquery: {
partitionBy: "DATE(timestamp)",
updatePartitionFilter:
"storeName like '%XXXX%'",
clusterBy: ["itemName",'StoreName']
}
}
SELECT DATE(timestamp) as Dates, *
from `<mytable in BQ>`
${when(incremental(), `where timestamp > (select max(timestamp) from ${self()})`)}
这样做正确吗?由于我每天都在更新,所以我无法在明天之前对此进行测试。我没时间等了。
我设法解决了这个问题。
updatePartitionFilter:
"storeName like '%XXXX%'",
不是执行此操作的方法,因为它会影响分区,而不是选择的内容。
实际需要做的是在查询末尾移动 incremental
语句中的第二个子句:
config {
type: "incremental",
bigquery: {
partitionBy: "DATE(timestamp)",
clusterBy: ["itemName",'StoreName']
}
}
SELECT DATE(timestamp) as Dates, *
from `<mytable in BQ>`
${when(incremental(), `where timestamp > (select max(timestamp) from ${self()}) and storeName like '%XXXX%'`)}
事实上,需要在此处插入任何其他过滤器。
SELECT DATE(timestamp) as Dates, *
from `<mytable in BQ>`
${when(incremental(), `where timestamp > (select max(timestamp) from ${self()})
and storeName like '%XXXX%'
and itemName = 'My little pony'`)}
为了建立在 Serge 的答案之上,${when(incremental(), 'WHERE ..,')}
块只是将指定的 WHERE
子句附加到已执行的 SQL。可在 here.
在您的初始示例中,您已经指定了一个 WHERE
子句,因此这是添加第二个 WHERE
子句,导致执行的查询中意外找到关键字的错误WHERE
.
Serge 提出的解决方案有效,因为这将添加 WHERE
子句,然后添加带有 AND
的额外条件,这(当递增 table 时)将编译以下:
INSERT INTO `<target_table>`
SELECT DATE(timestamp) AS Dates, *
FROM `<mytable in BQ>`
WHERE timestamp > (select max(timestamp) from `<target_table>`
AND storeName like "%XXXX%"
但是我相信这将无法在 <target_table>
的初始构建中包含 storeName
上的过滤器,因为当 table 尚不存在时,${when(incremental(),'WHERE ...')}
子句未附加。这将使编译后的 SQL 如下所示:
CREATE OR REPLACE TABLE `<target_table>` AS
SELECT DATE(timestamp) AS Dates, *
FROM `<mytable in BQ>`
-- NOTE NO `WHERE` CLAUSE
更好的方法是将 ${when(incremental(),...)}
逻辑中的 WHERE
更改为 AND
,如下所示:
config {
type: "incremental",
bigquery: {
partitionBy: "DATE(timestamp)",
clusterBy: ["itemName",'StoreName']
}
}
SELECT DATE(timestamp) as Dates, *
from `<mytable in BQ>`
where storeName like "%XXXX%"
${when(incremental(), `AND timestamp > (select max(timestamp) from ${self()})`)}
这将在最初创建 table 时使用 storeName
上的过滤器成功执行查询,然后对于任何后续运行将执行以下编译的 SQL 以增加:
INSERT INTO `<target_table>`
SELECT DATE(timestamp) AS Dates, *
FROM `<mytable in BQ>`
WHERE storeName like "%XXXX%"
AND timestamp > (select max(timestamp) from `<target_table>`
希望这是有道理的! (这是我对 SO 的第一次回应,对于任何不可靠的格式,我们深表歉意...)
编辑:修改了第二个代码块示例,因为我没有意识到 Serge 在 ${when(incremental(),...)}
配置中包含 storeName
上的过滤器。