Azure Synapse 按顺序处理 20k 语句的最快方法
Azure Synapse fastest way to process 20k statements in order
我正在为基于云的数据库 (Azure) 设计增量更新过程。唯一现有的更改日志是一个 .txt 文件,它记录了数据库处理的每个插入、删除和更新语句。没有可用的更改数据捕获 table,或任何记录更改的数据库 table,我无法在数据库上启用水印。 .txt 文件的结构如下:
update [table] set x = 'data' where y = 'data'
go
insert into [table] values (data)
go
delete from [table] where x = data
go
我已经构建了将 .txt 文件转换为云中 table 的流程,如下所示:
update_id | db_operation | statement | user | processed_flag
----------|--------------|-------------------------------------------------|-------|---------------
1 | 'update' | 'update [table] set x = data where y = data' | user1 | 0
2 | 'insert' | 'insert into [table] values (data)' | user2 | 0
3 | 'delete' | 'delete from [table] where x = data' | user3 | 1
我使用这段代码创建一个临时的 table 未处理交易,然后循环 table,创建一个 sql 语句,然后执行该交易。
CREATE TABLE temp_incremental_updates
WITH
(
DISTRIBUTION = HASH ( [user] ),
HEAP
)
AS
SELECT ROW_NUMBER() OVER(ORDER BY (SELECT NULL)) AS Sequence,
[user],
[statement]
FROM upd.incremental_updates
WHERE processed_flag = 0;
DECLARE @nbr_statements INT = (SELECT COUNT(*) FROM temp_incremental_updates),
@i INT = 1;
WHILE @i <= @nbr_statements
BEGIN
DECLARE @sql_code NVARCHAR(4000) = (SELECT [statement] FROM temp_incremental_updates WHERE Sequence = @i);
EXEC sp_executesql @sql_code;
SET @i +=1;
END
DROP TABLE temp_incremental_updates;
UPDATE incremental_updates SET processed_flag = 1
这需要很长时间,一个多小时。有没有其他方法可以快速处理需要按特定顺序出现的多个 sql 语句?顺序是相关的,因为,例如:如果我尝试在创建该数据的插入语句之前处理删除语句,azure synapse 将引发错误。
尝试声明一个游标以一次性从 temp_incremental_updates 中选择所有数据,而不是进行多次读取:
CREATE TABLE temp_incremental_updates
WITH
(
DISTRIBUTION = HASH ( [user] ),
HEAP
)
AS
SELECT ROW_NUMBER() OVER(ORDER BY (SELECT NULL)) AS Sequence,
[user],
[statement]
FROM upd.incremental_updates
WHERE processed_flag = 0;
DECLARE cur CURSOR FOR SELECT [statement] FROM temp_incremental_updates ORDER BY Sequence
OPEN cur
FETCH NEXT FROM cur INTO @sql_code
WHILE @@FETCH_STATUS = 0 BEGIN
EXEC sp_executesql @sql_code;
FETCH NEXT FROM cur INTO @sql_code
END
-- Rest of the code
2 万条独立语句不到 2 小时对 Synapse 来说已经很不错了!
Synapse 并非用于事务处理。您需要将单个更新转换为批量更新,并对大批量或行执行 statements like MERGE
,而不是对每行执行 INSERT
、UPDATE
和 DELETE
。
根据您的情况,您可以:
- 将所有 inserts/updates 按 table 名称分组
- 为每个组创建临时 table。例如。
table1_insert_updates
- 运行
MERGE
喜欢从 table1_insert_updates
到 table1
的语句。
对于删除:
- 按table名称对主键进行分组
- 运行 每 table 个
DELETE FROM table1 where key in (primary keys)
。
坦率地说,20k 是一个糟糕的数字,它不是太小,也远远不够大。因此,即使在“分组”之后,如果 batch/group 尺寸太小,您仍然可能会遇到性能问题。
Synapse 不适用于事务处理。它将在不到 5 分钟的时间内将具有一百万行的 table 合并为具有十亿行的 table 使用单个 MERGE 语句更新一百万行,但是如果您 运行 1000一个接一个删除和1000个插入语句可能需要更长的时间!
编辑:您还必须使用 PARTITION BY
和 RANK
(或 ROWNUMBER
)来删除重复数据,以防在单个批次中对同一行进行多次更新.取决于您的输入方式(更新包含所有列(甚至未更改)或仅更改列)并不容易,这可能会变得非常复杂。
同样 Synapse 不适用于事务处理。
我正在为基于云的数据库 (Azure) 设计增量更新过程。唯一现有的更改日志是一个 .txt 文件,它记录了数据库处理的每个插入、删除和更新语句。没有可用的更改数据捕获 table,或任何记录更改的数据库 table,我无法在数据库上启用水印。 .txt 文件的结构如下:
update [table] set x = 'data' where y = 'data'
go
insert into [table] values (data)
go
delete from [table] where x = data
go
我已经构建了将 .txt 文件转换为云中 table 的流程,如下所示:
update_id | db_operation | statement | user | processed_flag
----------|--------------|-------------------------------------------------|-------|---------------
1 | 'update' | 'update [table] set x = data where y = data' | user1 | 0
2 | 'insert' | 'insert into [table] values (data)' | user2 | 0
3 | 'delete' | 'delete from [table] where x = data' | user3 | 1
我使用这段代码创建一个临时的 table 未处理交易,然后循环 table,创建一个 sql 语句,然后执行该交易。
CREATE TABLE temp_incremental_updates
WITH
(
DISTRIBUTION = HASH ( [user] ),
HEAP
)
AS
SELECT ROW_NUMBER() OVER(ORDER BY (SELECT NULL)) AS Sequence,
[user],
[statement]
FROM upd.incremental_updates
WHERE processed_flag = 0;
DECLARE @nbr_statements INT = (SELECT COUNT(*) FROM temp_incremental_updates),
@i INT = 1;
WHILE @i <= @nbr_statements
BEGIN
DECLARE @sql_code NVARCHAR(4000) = (SELECT [statement] FROM temp_incremental_updates WHERE Sequence = @i);
EXEC sp_executesql @sql_code;
SET @i +=1;
END
DROP TABLE temp_incremental_updates;
UPDATE incremental_updates SET processed_flag = 1
这需要很长时间,一个多小时。有没有其他方法可以快速处理需要按特定顺序出现的多个 sql 语句?顺序是相关的,因为,例如:如果我尝试在创建该数据的插入语句之前处理删除语句,azure synapse 将引发错误。
尝试声明一个游标以一次性从 temp_incremental_updates 中选择所有数据,而不是进行多次读取:
CREATE TABLE temp_incremental_updates
WITH
(
DISTRIBUTION = HASH ( [user] ),
HEAP
)
AS
SELECT ROW_NUMBER() OVER(ORDER BY (SELECT NULL)) AS Sequence,
[user],
[statement]
FROM upd.incremental_updates
WHERE processed_flag = 0;
DECLARE cur CURSOR FOR SELECT [statement] FROM temp_incremental_updates ORDER BY Sequence
OPEN cur
FETCH NEXT FROM cur INTO @sql_code
WHILE @@FETCH_STATUS = 0 BEGIN
EXEC sp_executesql @sql_code;
FETCH NEXT FROM cur INTO @sql_code
END
-- Rest of the code
2 万条独立语句不到 2 小时对 Synapse 来说已经很不错了!
Synapse 并非用于事务处理。您需要将单个更新转换为批量更新,并对大批量或行执行 statements like MERGE
,而不是对每行执行 INSERT
、UPDATE
和 DELETE
。
根据您的情况,您可以:
- 将所有 inserts/updates 按 table 名称分组
- 为每个组创建临时 table。例如。
table1_insert_updates
- 运行
MERGE
喜欢从table1_insert_updates
到table1
的语句。
对于删除:
- 按table名称对主键进行分组
- 运行 每 table 个
DELETE FROM table1 where key in (primary keys)
。
坦率地说,20k 是一个糟糕的数字,它不是太小,也远远不够大。因此,即使在“分组”之后,如果 batch/group 尺寸太小,您仍然可能会遇到性能问题。
Synapse 不适用于事务处理。它将在不到 5 分钟的时间内将具有一百万行的 table 合并为具有十亿行的 table 使用单个 MERGE 语句更新一百万行,但是如果您 运行 1000一个接一个删除和1000个插入语句可能需要更长的时间!
编辑:您还必须使用 PARTITION BY
和 RANK
(或 ROWNUMBER
)来删除重复数据,以防在单个批次中对同一行进行多次更新.取决于您的输入方式(更新包含所有列(甚至未更改)或仅更改列)并不容易,这可能会变得非常复杂。
同样 Synapse 不适用于事务处理。