Synapse Spark SQL 增量合并不匹配输入错误

Synapse Spark SQL Delta Merge Mismatched Input Error

我正在尝试更新历史记录 table,但出现合并错误。当我 运行 这个单元格时:

%%sql

select * from main
UNION
select * from historical
where Summary_Employee_ID=25148

我得到两行 table,如下所示: 员工ID姓名 25148 温迪·克拉佩特 25148 温蒂猴子

我正在尝试更新名称...使用以下合并命令

%%sql
MERGE INTO main m
using historical h
on m.Employee_ID=h.Employee_ID
WHEN MATCHED THEN 
    UPDATE SET 
        m.Employee_ID=h.Employee_ID,
        m.Name=h.Name
WHEN NOT MATCHED THEN
    INSERT(Employee,Name)
    VALUES(h.Employee,h.Name)

这是我的错误:

错误: 不匹配的输入 'MERGE' 期待 {'(', 'SELECT', 'FROM', 'ADD', 'DESC', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'INSERT', 'DELETE', 'DESCRIBE', 'EXPLAIN', 'SHOW', 'USE', 'DROP', 'ALTER', 'MAP', 'SET', 'RESET', 'START', 'COMMIT', 'ROLLBACK', 'REDUCE' , 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'DFS', 'TRUNCATE', 'ANALYZE', 'LIST', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'EXPORT', 'IMPORT', 'LOAD'}(第 1 行, 位置 0)

您的目标是更新目标 table historical,但根据您的查询,目标 table 设置为 main 而不是 historical以及更新语句设置为 main 和插入语句设置为 historical

尝试以下操作,

%%sql
MERGE INTO historical target
using main source
on source.Employee_ID=target.Employee_ID
WHEN MATCHED THEN 
    UPDATE SET 
        target.Name=source.Name
WHEN NOT MATCHED THEN
    INSERT(Employee,Name)
    VALUES(source.Employee,source.Name)

Synapse 不支持 sql 合并,就像数据块一样。但是,您可以使用 python 解决方案。注意历史真的是我的更新...

所以对于上面的内容,我使用了:

import delta

main = delta.DeltaTable.forPath(spark,"path")

(main
    .alias("main")
    .merge(historical.alias("historical"),
    .whenMatchedUpdate(set = {main.Employee_ID=historical.Employee_ID})
    .whenNotMathcedInsert(values = 
        {"employeeID":"historical.employeeID","name"="historical.name})
.execute()
)

目前处于预览状态的 Spark 3.0 支持它,所以这可能值得一试。我确实在 Spark 3.0 池中看到了同样的错误,但它非常具有误导性,因为它实际上意味着您正在尝试合并重复数据,或者您正在向原始数据集提供重复数据。我已经通过使用无服务器 SQL 池和 Polybase 查询 delta 湖和原始文件的重复项来验证这一点。