Cloud Data Fusion ETL 从 PostGres 到 BigQuery - 幂等负载

Cloud Data Fusion ETL from PostGres to BigQuery - idempotent load

我正在尝试使用 Google 的云数据融合 (CDF) 将 PostGres 中的一些 OLTP 数据执行 ETL 到 BigQuery (BQ)。我们将每天晚上将一些 select table 的内容复制到 BQ 中的等效 table - 我们将添加一个带有日期戳的列。

所以假设我们有一个 table,其中包含两列 A 和 B,以及 PostGres 中的一行数据

|--------------------|
|    A    |    B     |
|--------------------|
|  egg    |  milk    |
|--------------------|

然后在两天后,BigQuery table 将如下所示

|-------------------------------|
|    ds    |    A    |    B     |
|-------------------------------|
| 22-01-01 |   egg   |   milk   |
|-------------------------------|
| 22-01-02 |   egg   |   milk   |
|-------------------------------|

但是,我担心我在 CDF 中执行此操作的方式不是幂等的,如果管道运行两次,我将在 BQ 中获得给定日期的重复数据(不需要)

一个想法是在执行 ETL 之前删除 BQ 中当天的行(作为同一管道的一部分)。但是,不确定如何执行此操作,或者这是否是最佳做法。有什么想法吗?

您可以在管道开始时删除 BigQuery 操作中的数据,但如果人们主动查询 table,或者如果删除操作成功但其余的管道失败。

BigQuery 接收器允许您将其配置为更新插入数据而不是插入数据。只要您的数据具有可以使用的密钥,这就应该使其幂等。

其他一些可能性是在运行 BigQuery MERGE 的接收器之后放置 BigQuery 执行,或者编写一个自定义条件插件来查询 BigQuery 并且仅在日期数据不存在时才运行管道的其余部分.

您可以使用以下 2 个选项之一,具体取决于您要对信息执行的操作:

选项 1

您可以创建一个具有相同架构 (ds,A,B) 的空白 new_table。您将从 Data Fusion 中将数据插入 old_table。使用 MERGE 语句,您将比较 old_tablenew_table 的数据; new_table 中不存在的数据将被插入,存在且具有不同数据的数据将更新其他数据。

MERGE merge_example.new_table T
USING dataset.old_table S
ON T.ds = S.ds
WHEN MATCHED THEN
 UPDATE SET T.A = s.a, T.B=s.b
WHEN NOT MATCHED THEN
 INSERT (ds,A, B) VALUES(ds, A, B)

选项 2

与方案一的过程相同,但该查询只是将不存在的数据插入new_table.

insert into `dataset.new_table`
select ds, A, B from `dataset.old_table`
where ds not in (select ds from `dataset.new_table`)

选项 1 和选项 2 的区别在于选项 1 将更新 new_table 中具有不同值的现有数据并插入新数据。选项 2 只会插入新数据。

您可以每天使用计划查询执行一次这些查询。你可以看到这个documentation.