将相同键添加到增量 table 合并的方法

Way to add same keys to delta table merge

我有一个增量table。在这个增量 table 中,包含重复键。例如:

id  age
1   22
1   23
1   25
2   22
2   11

将新的 table 合并到如下所示的增量 table 时:

id  age
1   23
1   24
1   23
2   21
2   12

使用这个函数:

def upsertToDelta(microBatchOutputDF):
    (student_table.alias("t").merge(
        microBatchOutputDF.alias("s"),
       "s.id = t.id")
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute())

它抛出一个错误:

Cannot perform Merge as multiple source rows matched and attempted to modify the same

我明白为什么会这样,但我想知道如何删除旧密钥并插入新密钥,即使 ID 相同。所以结果 table 应该是这样的:

id  age
1   23
1   24
1   23
2   21
2   12

有办法吗?

这看起来像 SCD type 1 更改,我们用新数据覆盖旧数据。要处理这个问题,您必须至少有一个唯一的作为合并键。一个简单的 row_number 也可以满足您的情况,如下所示:

合并前:

在新数据中添加 row_number,按 id 列分区。这在下面的合并语句中处理。 (打印此处以供理解)

合并SQL:

MERGE INTO student_table AS target
USING (
  SELECT id AS merge_key, id, age
  FROM microBatchOutputDF
  WHERE id IN (
      SELECT DISTINCT id
      FROM student_table
      )
  
  UNION ALL
  
  SELECT NULL AS merge_key, id, age
  FROM microBatchOutputDF
  WHERE id IN (
      SELECT DISTINCT id
      FROM student_table
      )
  ) AS source
  ON target.id = source.id 
  AND target.id = source.merge_key

WHEN MATCHED
  THEN
    DELETE

WHEN NOT MATCHED AND source.merge_key IS NULL
  THEN
    INSERT (target.id, target.row_num, target.age)
    VALUES (source.id, 1, source.age)
;

结果: