将相同键添加到增量 table 合并的方法
Way to add same keys to delta table merge
我有一个增量table。在这个增量 table 中,包含重复键。例如:
id age
1 22
1 23
1 25
2 22
2 11
将新的 table 合并到如下所示的增量 table 时:
id age
1 23
1 24
1 23
2 21
2 12
使用这个函数:
def upsertToDelta(microBatchOutputDF):
(student_table.alias("t").merge(
microBatchOutputDF.alias("s"),
"s.id = t.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute())
它抛出一个错误:
Cannot perform Merge as multiple source rows matched and attempted to modify the same
我明白为什么会这样,但我想知道如何删除旧密钥并插入新密钥,即使 ID 相同。所以结果 table 应该是这样的:
id age
1 23
1 24
1 23
2 21
2 12
有办法吗?
这看起来像 SCD type 1
更改,我们用新数据覆盖旧数据。要处理这个问题,您必须至少有一个唯一的作为合并键。一个简单的 row_number
也可以满足您的情况,如下所示:
合并前:
在新数据中添加 row_number,按 id 列分区。这在下面的合并语句中处理。 (打印此处以供理解)
合并SQL:
MERGE INTO student_table AS target
USING (
SELECT id AS merge_key, id, age
FROM microBatchOutputDF
WHERE id IN (
SELECT DISTINCT id
FROM student_table
)
UNION ALL
SELECT NULL AS merge_key, id, age
FROM microBatchOutputDF
WHERE id IN (
SELECT DISTINCT id
FROM student_table
)
) AS source
ON target.id = source.id
AND target.id = source.merge_key
WHEN MATCHED
THEN
DELETE
WHEN NOT MATCHED AND source.merge_key IS NULL
THEN
INSERT (target.id, target.row_num, target.age)
VALUES (source.id, 1, source.age)
;
结果:
我有一个增量table。在这个增量 table 中,包含重复键。例如:
id age
1 22
1 23
1 25
2 22
2 11
将新的 table 合并到如下所示的增量 table 时:
id age
1 23
1 24
1 23
2 21
2 12
使用这个函数:
def upsertToDelta(microBatchOutputDF):
(student_table.alias("t").merge(
microBatchOutputDF.alias("s"),
"s.id = t.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute())
它抛出一个错误:
Cannot perform Merge as multiple source rows matched and attempted to modify the same
我明白为什么会这样,但我想知道如何删除旧密钥并插入新密钥,即使 ID 相同。所以结果 table 应该是这样的:
id age
1 23
1 24
1 23
2 21
2 12
有办法吗?
这看起来像 SCD type 1
更改,我们用新数据覆盖旧数据。要处理这个问题,您必须至少有一个唯一的作为合并键。一个简单的 row_number
也可以满足您的情况,如下所示:
合并前:
在新数据中添加 row_number,按 id 列分区。这在下面的合并语句中处理。 (打印此处以供理解)
合并SQL:
MERGE INTO student_table AS target
USING (
SELECT id AS merge_key, id, age
FROM microBatchOutputDF
WHERE id IN (
SELECT DISTINCT id
FROM student_table
)
UNION ALL
SELECT NULL AS merge_key, id, age
FROM microBatchOutputDF
WHERE id IN (
SELECT DISTINCT id
FROM student_table
)
) AS source
ON target.id = source.id
AND target.id = source.merge_key
WHEN MATCHED
THEN
DELETE
WHEN NOT MATCHED AND source.merge_key IS NULL
THEN
INSERT (target.id, target.row_num, target.age)
VALUES (source.id, 1, source.age)
;
结果: