我如何确保我不会 运行 进入这种竞争状态?

How can I ensure I dont run into this race condition?

我当前的流程是运行,方式如下:

1.) 用户在前端app中输入一个URL进行分析

2.) 前端验证 URL 并在包含 URL

属性的 table 中创建 URL 的记录

3.) 前端creates/updates table 中的一行跟踪URL 处于处理的哪个阶段(每个URL 都有其自己的内部 ID)

3.A)状态码更新为"queued"status

---- Table 定义:

ID INT PRIMARY KEY,
StatusCode INT,
StatusDescription VARCHAR(MAX),
IsInitial BIT,
LastUpdated DATETIME

4.) 前端向包含提交的 URL 的内部 ID

的 Azure 存储队列发送消息

第一条消息发送到队列后------------>

4.A)在UI中创建了一个对象供用户点击(到"refresh"数据)

4.B) 用户在创建的对象上点击(很有可能会发生)(如果经过验证则立即点击)

4.C) 另一条消息被发送到包含 URL 的 ID

的队列

<--------------------------------

5.) 一个 azure webjob(后台任务)运行 不断获取这些消息并开始处理

6.) 网络作业确定此 URL 是否已准备好进行处理

..... 准备开始处理 if

..... 它退出如果

一旦确定可以继续...

a.) 如果进程出错,状态代码会更新以反映

b.) 一条新消息被推入队列重试

判断URL是否准备好解析的函数:

    private bool IsReadyToParse(int [ID])
    {
        using (var db = EntityFactory.GetInstance())
        {
            var item = db.ProcessStatus.FirstOrDefault(x => x.ID == [ID]);

            if (item == null || item.StatusCode > 1)
            {
                return false;
            }

            if (item.StatusCode == (int)ProcessStatusEnum.Error || item.LastUpdated == null)
            {
                item.LastUpdated = DateTime.Now;
                db.Entry(item).State = EntityState.Modified;
                db.SaveChanges();
                return true;
            }

            return ((DateTime)item.LastUpdated).AddMinutes(15) < DateTime.Now;
        }
    }

队列消息通过这个函数进入:

     // This function will get triggered/executed when a new message is written 
    // on an Azure Queue
    public static void ProcessQueueMessage([QueueTrigger("[queue]")] QueueItem item, TextWriter log)
    {
        Console.WriteLine("Item found! Starting services [Id: {0}]", item.ID);

        Agent agent = new Agent([ID], log);
        agent.StartProcessing();

        log.WriteLine([Item]);
    }

...现在的问题是这个连续 运行 webjob 一次可以接收多个消息(我想将其扩展到更多的 webjobs 从同一个队列读取道路)

如何确定函数 IsReadyToParse() 确实反映了当前的处理状态?

如果数据库正要将状态代码更新为 "in process",但另一个线程刚刚读取了状态代码并同意继续该过程怎么办?

这是我的基本解决方案,经过有限的开发人员测试...会随着我的进行而更新。

…… 使用此存储过程而不是 IsReadyToParse()

CREATE PROCEDURE dbo.usp_getIsReadyForProcess
@[ID] INT 
AS
BEGIN
 BEGIN TRY

  SET TRANSACTION ISOLATION LEVEL SERIALIZABLE
  BEGIN TRANSACTION

    DECLARE @lastUpdated DATETIME
    DECLARE @statusCode INT

    -- LOCK ROW UNTIL END OF TRANSACTION
    SET @lastUpdated = (SELECT LastUpdated FROM dbo.ProcessStatus WITH (ROWLOCK, HOLDLOCK) WHERE [ID] = @[ID])
    SET @statusCode = (SELECT StatusCode FROM dbo.ProcessStatus WHERE [ID] = @[ID])

    DECLARE @isReady BIT

    --If there is no row count
    IF @@ROWCOUNT = 0
    BEGIN
        SET @isReady = 0
    END

    -- If video is already in process
    ELSE IF @statusCode > 1
    BEGIN
        SET @isReady = 0
    END

    -- If this is the first time it is getting parsed
    ELSE IF @lastUpdated IS NULL
    BEGIN
        SET @isReady = 1

        --Update datetime field
        UPDATE dbo.ProcessStatus 
        SET LastUpdated = GETDATE()
        WHERE [ID] = @[ID]
    END

    -- If is isnt the initial parse and hasnt been 15 minutes yet
    ELSE IF GETDATE() < DATEADD(MINUTE, 15, @lastUpdated)
    BEGIN
        SET @isReady = 0
    END


    -- Anything else, and its a go 
    ELSE 
    BEGIN
        SET @isReady = 1
    END



    -- If were ready to start, update the status code
    IF @isReady = 1
    BEGIN
        UPDATE dbo.ProcessStatus 
        SET StatusCode = 2 
        WHERE [ID] = @[ID]
    END

 COMMIT TRANSACTION

 SELECT @isReady

 END TRY    
 BEGIN CATCH
    -- If there was any type of error
    ROLLBACK
    SELECT 0
 END CATCH 
END

这是一种可能的方法,类似于 WebJobs SDK 在内部所做的,以防止多个 webjob 函数同时处理同一个 blob 触发器。

当函数从队列中获取消息时,创建一个与消息中的 ID 同名的 blob。 blob 的内容是处理的状态(完成或进行中)。当一个函数想要处理具有该 ID 的消息时,它必须租用该 blob - 这保证了线程安全。那么:

  • 如果不能获取到least,其他人正在处理消息=>丢弃队列消息。
  • 如果它获得了租约但状态是"Done",有人已经处理了消息=>丢弃队列消息。
  • 如果它获得了租约并且状态为 "In progress",有人试图处理该消息但无法完成 => 使用该消息并再次处理。

如果处理一条消息可能需要超过 60 秒,您将需要一些额外的代码来续订 blob 租约,否则它会过期并且其他人可以接收它。