BQ table 最新日期的预定增量更新
Scheduled incremental update of BQ table latest date
我真的是数据形式的新手,在阅读文档时,我还没有找到一种方法来做我想做的事情,即以我可以安排它的方式配置我的增量查询运行每天只看我原始数据中的绝对最新数据table.
假设我的 raw_data
位于 BQ 中,我要如何转换该数据并将其加载到新的分区和集群 table,称之为 Transfomed
,但仅在 raw_data
.
的最新上传行上执行此操作
我知道如何通过硬编码昨天的日期来做到这一点:
config {
type: "incremental",
bigquery: {
partitionBy: "DATE(ts)",
clusterBy: ["itemName"]
}
}
pre_operations {
declare event_timestamp_checkpoint default (
${when(incremental(),
`select max(ts) from ${self()}`,
`select timestamp("2021-10-06")`)}
)
}
SELECT distinct
timestamp as ts,
storeName,
DATE(timestamp) as Date,
itemId
.......
我试图每天 运行 它而无需手动执行的是:
config {
type: "incremental",
bigquery: {
partitionBy: "DATE(ts)",
clusterBy: ["itemName"]
}
}
pre_operations {
declare event_timestamp_checkpoint default (
${when(incremental(),
`select max(ts) from ${self()}`,
`select timestamp(DATE(max(ts)-1))}
)
}
因此将硬编码日期 2021-10-06
替换为 DATE(max(ts)-1)
。显然,这没有用。
关于如何解决这个问题有什么想法吗?
为了您的目的,我了解到您想将来自源 table 的所有新数据添加到您的新 table 所有数据中。集群和分区不会影响您的操作方式,唯一的区别是您必须在新 table.
中声明您需要此功能
when()
子句中定义的规则将被编译到 SQL 查询的 WHERE
过滤器中,当 table 已经创建并且管道线正在向 table.
添加数据
您不需要过滤前一天的时间戳,您只需要导入所有比您在目标上的最新数据更新的数据table。
因此您的代码显示如下:
config {
type: "incremental",
bigquery: {
partitionBy: "DATE(ts)",
clusterBy: ["itemName"]
}
}
select ts, storeName, itemId, itemName
from <dataset>.source_table
${when(incremental(), `where ts > (select max(ts) from ${self()})`)}
每次您 运行 此管道时,目标 table 都会更新为新数据。您可以根据需要在查询中转换数据。
这是我的sandbox
的详细信息:
来源table:
上面的数据:
运行在 dataform
中安装管道:
如果您将新数据添加到源 table,只有新行会被
添加到目的地 table:
insert into `<bucket>.<dataset>.source_table` values ('2021-10-20 00:00:00 UTC','zabka',3,'woda3')
运行 管道再次(或调度)并添加新数据:
Here 您可以在 dataform
文档中找到有关如何创建增量 tables 的更多详细信息。
我真的是数据形式的新手,在阅读文档时,我还没有找到一种方法来做我想做的事情,即以我可以安排它的方式配置我的增量查询运行每天只看我原始数据中的绝对最新数据table.
假设我的 raw_data
位于 BQ 中,我要如何转换该数据并将其加载到新的分区和集群 table,称之为 Transfomed
,但仅在 raw_data
.
我知道如何通过硬编码昨天的日期来做到这一点:
config {
type: "incremental",
bigquery: {
partitionBy: "DATE(ts)",
clusterBy: ["itemName"]
}
}
pre_operations {
declare event_timestamp_checkpoint default (
${when(incremental(),
`select max(ts) from ${self()}`,
`select timestamp("2021-10-06")`)}
)
}
SELECT distinct
timestamp as ts,
storeName,
DATE(timestamp) as Date,
itemId
.......
我试图每天 运行 它而无需手动执行的是:
config {
type: "incremental",
bigquery: {
partitionBy: "DATE(ts)",
clusterBy: ["itemName"]
}
}
pre_operations {
declare event_timestamp_checkpoint default (
${when(incremental(),
`select max(ts) from ${self()}`,
`select timestamp(DATE(max(ts)-1))}
)
}
因此将硬编码日期 2021-10-06
替换为 DATE(max(ts)-1)
。显然,这没有用。
关于如何解决这个问题有什么想法吗?
为了您的目的,我了解到您想将来自源 table 的所有新数据添加到您的新 table 所有数据中。集群和分区不会影响您的操作方式,唯一的区别是您必须在新 table.
中声明您需要此功能when()
子句中定义的规则将被编译到 SQL 查询的 WHERE
过滤器中,当 table 已经创建并且管道线正在向 table.
您不需要过滤前一天的时间戳,您只需要导入所有比您在目标上的最新数据更新的数据table。
因此您的代码显示如下:
config {
type: "incremental",
bigquery: {
partitionBy: "DATE(ts)",
clusterBy: ["itemName"]
}
}
select ts, storeName, itemId, itemName
from <dataset>.source_table
${when(incremental(), `where ts > (select max(ts) from ${self()})`)}
每次您 运行 此管道时,目标 table 都会更新为新数据。您可以根据需要在查询中转换数据。
这是我的sandbox
的详细信息:
来源table:
上面的数据:
运行在 dataform
中安装管道:
如果您将新数据添加到源 table,只有新行会被 添加到目的地 table:
insert into `<bucket>.<dataset>.source_table` values ('2021-10-20 00:00:00 UTC','zabka',3,'woda3')
运行 管道再次(或调度)并添加新数据:
Here 您可以在 dataform
文档中找到有关如何创建增量 tables 的更多详细信息。