Azure Synapse 按顺序处理 20k 语句的最快方法

Azure Synapse fastest way to process 20k statements in order

我正在为基于云的数据库 (Azure) 设计增量更新过程。唯一现有的更改日志是一个 .txt 文件,它记录了数据库处理的每个插入、删除和更新语句。没有可用的更改数据捕获 table,或任何记录更改的数据库 table,我无法在数据库上启用水印。 .txt 文件的结构如下:

update [table] set x = 'data' where y = 'data'
go
insert into [table] values (data)
go
delete from [table] where x = data
go

我已经构建了将 .txt 文件转换为云中 table 的流程,如下所示:

update_id | db_operation | statement                                       | user  | processed_flag
----------|--------------|-------------------------------------------------|-------|---------------
1         | 'update'     | 'update [table] set x = data where y = data'    | user1 | 0
2         | 'insert'     | 'insert into [table] values (data)'             | user2 | 0
3         | 'delete'     | 'delete from [table] where x = data'            | user3 | 1

我使用这段代码创建一个临时的 table 未处理交易,然后循环 table,创建一个 sql 语句,然后执行该交易。

CREATE TABLE temp_incremental_updates
WITH
( 
    DISTRIBUTION = HASH ( [user] ),
    HEAP
)
AS
SELECT  ROW_NUMBER() OVER(ORDER BY (SELECT NULL)) AS Sequence, 
        [user],
        [statement]
FROM    upd.incremental_updates
WHERE   processed_flag = 0;

DECLARE @nbr_statements INT = (SELECT COUNT(*) FROM temp_incremental_updates),
        @i INT = 1;

WHILE   @i <= @nbr_statements
BEGIN
    DECLARE @sql_code NVARCHAR(4000) = (SELECT [statement] FROM temp_incremental_updates WHERE Sequence = @i);
    EXEC    sp_executesql @sql_code;
    SET     @i +=1;
END

DROP TABLE temp_incremental_updates;

UPDATE incremental_updates SET processed_flag = 1

这需要很长时间,一个多小时。有没有其他方法可以快速处理需要按特定顺序出现的多个 sql 语句?顺序是相关的,因为,例如:如果我尝试在创建该数据的插入语句之前处理删除语句,azure synapse 将引发错误。

尝试声明一个游标以一次性从 temp_incremental_updates 中选择所有数据,而不是进行多次读取:

CREATE TABLE temp_incremental_updates
WITH
( 
    DISTRIBUTION = HASH ( [user] ),
    HEAP
)
AS
SELECT  ROW_NUMBER() OVER(ORDER BY (SELECT NULL)) AS Sequence, 
        [user],
        [statement]
FROM    upd.incremental_updates
WHERE   processed_flag = 0;

DECLARE cur CURSOR FOR SELECT [statement] FROM temp_incremental_updates ORDER BY Sequence

OPEN cur
FETCH NEXT FROM cur INTO @sql_code
WHILE @@FETCH_STATUS = 0 BEGIN
  EXEC sp_executesql @sql_code;
  FETCH NEXT FROM cur INTO @sql_code
END

-- Rest of the code

2 万条独立语句不到 2 小时对 Synapse 来说已经很不错了!

Synapse 并非用于事务处理。您需要将单个更新转换为批量更新,并对大批量或行执行 statements like MERGE,而不是对每行执行 INSERTUPDATEDELETE

根据您的情况,您可以:

  • 将所有 inserts/updates 按 table 名称分组
  • 为每个组创建临时 table。例如。 table1_insert_updates
  • 运行 MERGE 喜欢从 table1_insert_updatestable1 的语句。

对于删除:

  • 按table名称对主键进行分组
  • 运行 每 table 个 DELETE FROM table1 where key in (primary keys)

坦率地说,20k 是一个糟糕的数字,它不是太小,也远远不够大。因此,即使在“分组”之后,如果 batch/group 尺寸太小,您仍然可能会遇到性能问题。

Synapse 不适用于事务处理。它将在不到 5 分钟的时间内将具有一百万行的 table 合并为具有十亿行的 table 使用单个 MERGE 语句更新一百万行,但是如果您 运行 1000一个接一个删除和1000个插入语句可能需要更长的时间!


编辑:您还必须使用 PARTITION BYRANK(或 ROWNUMBER)来删除重复数据,以防在单个批次中对同一行进行多次更新.取决于您的输入方式(更新包含所有列(甚至未更改)或仅更改列)并不容易,这可能会变得非常复杂。

同样 Synapse 不适用于事务处理。