如何正确截断 ETL 管道中的分段 table?

How to properly truncate a staging table in an ETL pipeline?

我们有一个 ETL 管道,为每个上传到存储帐户 (Azure) 的 CSV 运行。它在 CSV 上运行一些转换并将输出写入另一个位置,也作为 CSV,并调用数据库 (SQL Azure) 上的存储过程,该过程将生成的 CSV 摄取 (BULK INSERT) 到暂存 table.

此管道可以并发执行,因为多个资源可以将文件上传到存储。因此,分段 table 经常插入数据。

然后,我们有一个计划的 SQL 作业(弹性作业)触发一个 SP,将数据从暂存 table 移动到最终 table。 在这一点上,我们希望 truncate/empty 暂存 table 这样我们就不会在下一次执行作业时重新插入它们。

问题是,我们无法确定在从暂存 table 加载到最终 table 和截断命令之间,没有任何新数据写入暂存 table 可以被截断而无需首先插入到最后的 table.

有没有办法在我们将数据复制到最终 table 时锁定暂存 table 以便尝试写入它的 SP(从 ETL 管道调用)只是等到锁被释放?这是否可以通过使用事务或一些手动锁定命令来实现?

如果不是,处理此问题的最佳方法是什么?

Is there a way to lock the staging table while we're copying the data into the final table so that the SP (called from the ETL pipeline) trying to write to it will just wait until the lock is release? Is this achievable by using transactions or maybe some manual lock commands?

看来您正在寻找一种比交易级别更广泛的机制。 SQL Server/Azure SQL DB 有一个,叫做 application lock:

sp_getapplock

Places a lock on an application resource.

Locks placed on a resource are associated with either the current transaction or the current session. Locks associated with the current transaction are released when the transaction commits or rolls back.Locks associated with the session are released when the session is logged out. When the server shuts down for any reason, all locks are released.

Locks can be explicitly released with sp_releaseapplock. When an application calls sp_getapplock multiple times for the same lock resource, sp_releaseapplock must be called the same number of times to release the lock. When a lock is opened with the Transaction lock owner, that lock is released when the transaction is committed or rolled back.

这基本上意味着您的 ETL 工具应该打开与数据库的单个会话,获取锁并在完成后释放。其他会话在尝试做任何事情之前应该尝试获取锁(他们不能因为它已经被占用),等到它释放并继续工作。

我喜欢 sp_getapplock 并且我自己在几个地方使用了这种方法,因为它非常灵活,而且您可以完全控制锁定逻辑和等待时间。

我看到的唯一问题是,在您的情况下,并发进程并不完全相同。

您的 SP1 将数据从分段 table 移动到主 table。您的系统从不尝试 运行 此 SP 的多个实例。

另外一个SP2把数据插入stagingtable可以同时运行几次就可以了

很容易实施锁定,以防止 SP1 或 SP2 的任何组合的任何并发 运行。换句话说,如果 SP1 和 SP2 的锁定逻辑相同并且它们被同等对待,则很容易。但是,您不能同时拥有多个 SP2 运行ning 实例。

目前尚不清楚如何实施锁定以防止 SP1 和 SP2 并发 运行,同时允许 SP2 的多个实例同时 运行。


还有另一种方法,它不试图阻止 SP 的并发 运行,而是拥抱并期望并发 运行 是可能的。

一种方法是向暂存 table 添加一个 IDENTITY 列。或者一个自动填充的日期时间,如果你能保证它是唯一的并且永远不会减少的话,这可能很棘手。或 rowversion 列。

SP2 中将数据插入分段 table 的逻辑没有改变。

SP1 中将数据从暂存 table 移动到主要 table 的逻辑需要使用这些标识值。

首先从stagingtable中读取identity的当前最大值,并将其存储在一个变量中,比如@MaxID。该 SP1 中暂存 table 中的所有后续 SELECT、UPDATE 和 DELETE 都应包含过滤器 WHERE ID <= @MaxID.

这将确保如果在 SP1 运行ning 期间碰巧有一个新行添加到暂存 table,该行将不会被处理并保留在暂存中 table 直到 SP1 的下一个 运行。

这种方法的缺点是不能用TRUNCATE,需要用DELETEWHERE ID <= @MaxID


如果您同意多个 SP2 实例相互等待(和 SP1),则可以使用类似于以下内容的 sp_getapplock。我的存储过程中有这段代码。您应该将此逻辑放入 SP1 和 SP2。

这里我没有显式调用sp_releaseapplock,因为锁所有者设置为事务,引擎会在事务结束时自动释放锁。

您不必在存储过程中放置​​重试逻辑,运行这些存储过程可以在外部代码中。在任何情况下,您的代码都应该准备好重试。

CREATE PROCEDURE SP2  -- or SP1
AS
BEGIN
    SET NOCOUNT ON;
    SET XACT_ABORT ON;

    BEGIN TRANSACTION;
    BEGIN TRY
        -- Maximum number of retries
        DECLARE @VarCount int = 10;

        WHILE (@VarCount > 0)
        BEGIN
            SET @VarCount = @VarCount - 1;

            DECLARE @VarLockResult int;
            EXEC @VarLockResult = sp_getapplock
                @Resource = 'StagingTable_app_lock',
                -- this resource name should be the same in SP1 and SP2
                @LockMode = 'Exclusive',
                @LockOwner = 'Transaction',
                @LockTimeout = 60000,
                -- I'd set this timeout to be about twice the time
                -- you expect SP to run normally
                @DbPrincipal = 'public';

            IF @VarLockResult >= 0
            BEGIN
                -- Acquired the lock

                -- for SP2
                -- INSERT INTO StagingTable ...

                -- for SP1
                -- SELECT FROM StagingTable ...
                -- TRUNCATE StagingTable ...

                -- don't retry any more
                BREAK;
            END ELSE BEGIN
                -- wait for 5 seconds and retry
                WAITFOR DELAY '00:00:05';
            END;
        END;

        COMMIT TRANSACTION;
    END TRY
    BEGIN CATCH
        ROLLBACK TRANSACTION;
        -- log error
    END CATCH;

END

此代码保证在任何给定时刻只有一个过程与分期 table 一起工作。没有并发性。所有其他实例将等待。

显然,如果您尝试不通过这些 SP1 或 SP2(首先尝试获取锁)访问暂存 table,则此类访问不会被阻止。

我建议使用两个相同的分段 table 解决方案。让我们将它们命名为 StageLoading 和 StageProcessing。
加载过程将有以下步骤:
1、一开始两个table都是空的
2. 我们将一些数据加载到 StageLoading table(我假设每个加载都是一个事务)。
3. 当 Elastic 作业启动时,它将执行:
- ALTER TABLE SWITCH 将所有数据从 StageLoading 移动到 StageProcessing。它将使 StageLoading 清空并为下一次加载做好准备。这是一个元数据操作,因此需要几毫秒并且它是完全阻塞的,因此将在加载之间完成。
- 将数据从 StageProcessing 加载到最终 tables.
- 截断 table StageProcessing。
4. 现在我们准备好进行下一个 Elastic 作业了。

如果我们在 StageProcessing 不为空时尝试执行 SWITCH,ALTER 将失败,这意味着最后一个加载过程失败。

假设您只有一份出境工作

  • 将 OutboundProcessing BIT DEFAULT 0 添加到 table
  • 在作业中,SET OutboundProcessing = 1 WHERE OutboundProcessing = 0(声明行)
  • 对于 ETL,在获取数据(传输行)的查询中合并 WHERE OutboundProcessing = 1
  • 在 ETL 之后,DELETE FROM TABLE WHERE OutboundProcessing = 1(删除您传输的行)
  • 如果 ETL 失败,SET OutboundProcessing = 0 WHERE OutboundProcessing = 1

我总是喜欢 "ID" 我收到的每个文件。如果可以做到这一点,则可以在整个加载过程中关联给定文件中的记录。你没有说需要这个,但只是说。

然而,每个文件都有一个身份(应该只有一个 int/bigint 身份值),然后您可以从 "template" 中动态创建任意数量的负载 table加载 table.

  1. 当文件到达时,创建一个以文件 ID 命名的新负载 table。
  2. 从加载到最终处理您的数据table。
  3. 降低正在处理的文件的负载 table。

这有点类似于使用 2 tables(加载和暂存)的其他解决方案,但即使在该解决方案中,您仍然只能拥有 2 个文件 "loaded"(您仍然只应用一个文件到最后 table 不过?)

最后,不清楚您的 "Elastic Job" 是否与实际的 "load" pipeline/processing 分离,或者是否包含在内。作为一份工作,我假设它不包括在内,如果是一份工作,你一次只能运行一个实例?因此,如果您一次只能将一个文件从加载移动到最终,那么不清楚为什么一次加载多个文件很重要。为什么急于加载文件?