Synapse Spark SQL 增量合并不匹配输入错误
Synapse Spark SQL Delta Merge Mismatched Input Error
我正在尝试更新历史记录 table,但出现合并错误。当我 运行 这个单元格时:
%%sql
select * from main
UNION
select * from historical
where Summary_Employee_ID=25148
我得到两行 table,如下所示:
员工ID姓名
25148 温迪·克拉佩特
25148 温蒂猴子
我正在尝试更新名称...使用以下合并命令
%%sql
MERGE INTO main m
using historical h
on m.Employee_ID=h.Employee_ID
WHEN MATCHED THEN
UPDATE SET
m.Employee_ID=h.Employee_ID,
m.Name=h.Name
WHEN NOT MATCHED THEN
INSERT(Employee,Name)
VALUES(h.Employee,h.Name)
这是我的错误:
错误:
不匹配的输入 'MERGE' 期待 {'(', 'SELECT', 'FROM', 'ADD', 'DESC', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'INSERT', 'DELETE', 'DESCRIBE', 'EXPLAIN', 'SHOW', 'USE', 'DROP', 'ALTER', 'MAP', 'SET', 'RESET', 'START', 'COMMIT', 'ROLLBACK', 'REDUCE' , 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'DFS', 'TRUNCATE', 'ANALYZE', 'LIST', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'EXPORT', 'IMPORT', 'LOAD'}(第 1 行, 位置 0)
您的目标是更新目标 table historical
,但根据您的查询,目标 table 设置为 main
而不是 historical
以及更新语句设置为 main
和插入语句设置为 historical
尝试以下操作,
%%sql
MERGE INTO historical target
using main source
on source.Employee_ID=target.Employee_ID
WHEN MATCHED THEN
UPDATE SET
target.Name=source.Name
WHEN NOT MATCHED THEN
INSERT(Employee,Name)
VALUES(source.Employee,source.Name)
Synapse 不支持 sql 合并,就像数据块一样。但是,您可以使用 python 解决方案。注意历史真的是我的更新...
所以对于上面的内容,我使用了:
import delta
main = delta.DeltaTable.forPath(spark,"path")
(main
.alias("main")
.merge(historical.alias("historical"),
.whenMatchedUpdate(set = {main.Employee_ID=historical.Employee_ID})
.whenNotMathcedInsert(values =
{"employeeID":"historical.employeeID","name"="historical.name})
.execute()
)
目前处于预览状态的 Spark 3.0 支持它,所以这可能值得一试。我确实在 Spark 3.0 池中看到了同样的错误,但它非常具有误导性,因为它实际上意味着您正在尝试合并重复数据,或者您正在向原始数据集提供重复数据。我已经通过使用无服务器 SQL 池和 Polybase 查询 delta 湖和原始文件的重复项来验证这一点。
我正在尝试更新历史记录 table,但出现合并错误。当我 运行 这个单元格时:
%%sql
select * from main
UNION
select * from historical
where Summary_Employee_ID=25148
我得到两行 table,如下所示: 员工ID姓名 25148 温迪·克拉佩特 25148 温蒂猴子
我正在尝试更新名称...使用以下合并命令
%%sql
MERGE INTO main m
using historical h
on m.Employee_ID=h.Employee_ID
WHEN MATCHED THEN
UPDATE SET
m.Employee_ID=h.Employee_ID,
m.Name=h.Name
WHEN NOT MATCHED THEN
INSERT(Employee,Name)
VALUES(h.Employee,h.Name)
这是我的错误:
错误: 不匹配的输入 'MERGE' 期待 {'(', 'SELECT', 'FROM', 'ADD', 'DESC', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'INSERT', 'DELETE', 'DESCRIBE', 'EXPLAIN', 'SHOW', 'USE', 'DROP', 'ALTER', 'MAP', 'SET', 'RESET', 'START', 'COMMIT', 'ROLLBACK', 'REDUCE' , 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'DFS', 'TRUNCATE', 'ANALYZE', 'LIST', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'EXPORT', 'IMPORT', 'LOAD'}(第 1 行, 位置 0)
您的目标是更新目标 table historical
,但根据您的查询,目标 table 设置为 main
而不是 historical
以及更新语句设置为 main
和插入语句设置为 historical
尝试以下操作,
%%sql
MERGE INTO historical target
using main source
on source.Employee_ID=target.Employee_ID
WHEN MATCHED THEN
UPDATE SET
target.Name=source.Name
WHEN NOT MATCHED THEN
INSERT(Employee,Name)
VALUES(source.Employee,source.Name)
Synapse 不支持 sql 合并,就像数据块一样。但是,您可以使用 python 解决方案。注意历史真的是我的更新...
所以对于上面的内容,我使用了:
import delta
main = delta.DeltaTable.forPath(spark,"path")
(main
.alias("main")
.merge(historical.alias("historical"),
.whenMatchedUpdate(set = {main.Employee_ID=historical.Employee_ID})
.whenNotMathcedInsert(values =
{"employeeID":"historical.employeeID","name"="historical.name})
.execute()
)
目前处于预览状态的 Spark 3.0 支持它,所以这可能值得一试。我确实在 Spark 3.0 池中看到了同样的错误,但它非常具有误导性,因为它实际上意味着您正在尝试合并重复数据,或者您正在向原始数据集提供重复数据。我已经通过使用无服务器 SQL 池和 Polybase 查询 delta 湖和原始文件的重复项来验证这一点。