在目标 table 上将 MERGE 与分析函数(如 RANK)结合使用
Using MERGE with analytic functions (like RANK) on target table
我有一个数据管道,在其中,名为 stg
的暂存 table 被截断并被记录覆盖。然后,使用MERGE
,stg
中的记录应该按照以下规则合并到tabledim
维度(这是type 2的一个缓慢变化的维度):
- 当
stg
中存在 dim
中不存在的电子邮件时,将与该电子邮件对应的行插入 dim
标签 'INSERT'
- 当电子邮件 存在于
stg
和 dim
中时,检查它们的相应数据是否不同。如果是这样,则表示 'UPDATE'
.
- 当电子邮件在
stg
中不存在但在 dim
中存在时,这意味着它已被删除,因此用 'DELETE'
标记。
电子邮件对于该组织中的每个用户都是唯一的。
MERGE dim
USING stg
ON stg.email = dim.email
WHEN NOT MATCHED
INSERT(email, first_name, last_name, last_active, dml_type, extraction_timestamp, state_index)
VALUES(stg.email, stg.first_name, stg.last_name, stg.last_active, 'INSERT', stg.extraction_timestamp))
WHEN MATCHED AND stg.row_hash <> dim.row_hash
INSERT(email, first_name, last_name, last_active, dml_type, extraction_timestamp, state_index)
VALUES(stg.email, stg.first_name, stg.last_name, stg.last_active, 'UPDATE', stg.extraction_timestamp)
WHEN NOT MATCHED BY SOURCE
INSERT(email, first_name, last_name, last_active, dml_type, extraction_timestamp, state_index)
VALUES(stg.email, NULL, NULL, NULL, 'DELETE', stg.extraction_timestamp)
问题?此查询将 stg
与整个 dim
进行比较,而实际上我只想将其与 dim
的以下子集进行比较:
select *
from (
select *,
RANK() OVER(PARTITION BY email ORDER BY extraction_timestamp DESC) as rnk
from dim
) as hist
where rnk = 1
我可以 MERGE
以 dim
作为我的目标 table,以 stg
作为我的来源,但仅基于 rnk=1
中的值在上面显示的 RANK()
分析函数中计算?
是这样的吗?:
MERGE (
select *,
RANK() OVER(PARTITION BY email ORDER BY extraction_timestamp DESC) as rnk
from dim
) as dim_with_rank
USING stg
ON stg.email = dim_with_rank.email
AND dim_with_rank.rnk = 1
WHEN NOT MATCHED
INSERT(email, first_name, last_name, last_active, dml_type, extraction_timestamp, state_index)
VALUES(stg.email, stg.first_name, stg.last_name, stg.last_active, 'INSERT', stg.extraction_timestamp))
WHEN MATCHED AND stg.row_hash <> dim.row_hash
INSERT(email, first_name, last_name, last_active, dml_type, extraction_timestamp, state_index)
VALUES(stg.email, stg.first_name, stg.last_name, stg.last_active, 'UPDATE', stg.extraction_timestamp)
WHEN NOT MATCHED BY SOURCE
INSERT(email, first_name, last_name, last_active, dml_type, extraction_timestamp, state_index)
VALUES(stg.email, NULL, NULL, NULL, 'DELETE', stg.extraction_timestamp)
不幸的是,您不能 运行 合并然后启动子查询作为您的示例:
MERGE (
select *,
RANK() OVER(PARTITION BY email ORDER BY extraction_timestamp DESC) as rnk
from dim)
as dim_with_rank
USING stg
ON stg.email = dim_with_rank.email
AND dim_with_rank.rnk = 1
您需要像下一个一样创建查询:
MERGE dim
USING (
select *, RANK() OVER(PARTITION BY email ORDER BY extraction_timestamp DESC) as rnk
from stg )
as stg_with_rank
ON stg_with_rank.email = dim.email AND
stg_with_rank.rnk = 1 [...]
您可以查看有关此用例的更多信息here
我有一个数据管道,在其中,名为 stg
的暂存 table 被截断并被记录覆盖。然后,使用MERGE
,stg
中的记录应该按照以下规则合并到tabledim
维度(这是type 2的一个缓慢变化的维度):
- 当
stg
中存在dim
中不存在的电子邮件时,将与该电子邮件对应的行插入dim
标签'INSERT'
- 当电子邮件 存在于
stg
和dim
中时,检查它们的相应数据是否不同。如果是这样,则表示'UPDATE'
. - 当电子邮件在
stg
中不存在但在dim
中存在时,这意味着它已被删除,因此用'DELETE'
标记。
电子邮件对于该组织中的每个用户都是唯一的。
MERGE dim
USING stg
ON stg.email = dim.email
WHEN NOT MATCHED
INSERT(email, first_name, last_name, last_active, dml_type, extraction_timestamp, state_index)
VALUES(stg.email, stg.first_name, stg.last_name, stg.last_active, 'INSERT', stg.extraction_timestamp))
WHEN MATCHED AND stg.row_hash <> dim.row_hash
INSERT(email, first_name, last_name, last_active, dml_type, extraction_timestamp, state_index)
VALUES(stg.email, stg.first_name, stg.last_name, stg.last_active, 'UPDATE', stg.extraction_timestamp)
WHEN NOT MATCHED BY SOURCE
INSERT(email, first_name, last_name, last_active, dml_type, extraction_timestamp, state_index)
VALUES(stg.email, NULL, NULL, NULL, 'DELETE', stg.extraction_timestamp)
问题?此查询将 stg
与整个 dim
进行比较,而实际上我只想将其与 dim
的以下子集进行比较:
select *
from (
select *,
RANK() OVER(PARTITION BY email ORDER BY extraction_timestamp DESC) as rnk
from dim
) as hist
where rnk = 1
我可以 MERGE
以 dim
作为我的目标 table,以 stg
作为我的来源,但仅基于 rnk=1
中的值在上面显示的 RANK()
分析函数中计算?
是这样的吗?:
MERGE (
select *,
RANK() OVER(PARTITION BY email ORDER BY extraction_timestamp DESC) as rnk
from dim
) as dim_with_rank
USING stg
ON stg.email = dim_with_rank.email
AND dim_with_rank.rnk = 1
WHEN NOT MATCHED
INSERT(email, first_name, last_name, last_active, dml_type, extraction_timestamp, state_index)
VALUES(stg.email, stg.first_name, stg.last_name, stg.last_active, 'INSERT', stg.extraction_timestamp))
WHEN MATCHED AND stg.row_hash <> dim.row_hash
INSERT(email, first_name, last_name, last_active, dml_type, extraction_timestamp, state_index)
VALUES(stg.email, stg.first_name, stg.last_name, stg.last_active, 'UPDATE', stg.extraction_timestamp)
WHEN NOT MATCHED BY SOURCE
INSERT(email, first_name, last_name, last_active, dml_type, extraction_timestamp, state_index)
VALUES(stg.email, NULL, NULL, NULL, 'DELETE', stg.extraction_timestamp)
不幸的是,您不能 运行 合并然后启动子查询作为您的示例:
MERGE (
select *,
RANK() OVER(PARTITION BY email ORDER BY extraction_timestamp DESC) as rnk
from dim)
as dim_with_rank
USING stg
ON stg.email = dim_with_rank.email
AND dim_with_rank.rnk = 1
您需要像下一个一样创建查询:
MERGE dim
USING (
select *, RANK() OVER(PARTITION BY email ORDER BY extraction_timestamp DESC) as rnk
from stg )
as stg_with_rank
ON stg_with_rank.email = dim.email AND
stg_with_rank.rnk = 1 [...]
您可以查看有关此用例的更多信息here