我如何确保我不会 运行 进入这种竞争状态?
How can I ensure I dont run into this race condition?
我当前的流程是运行,方式如下:
1.) 用户在前端app中输入一个URL进行分析
2.) 前端验证 URL 并在包含 URL
属性的 table 中创建 URL 的记录
3.) 前端creates/updates table 中的一行跟踪URL 处于处理的哪个阶段(每个URL 都有其自己的内部 ID)
3.A)状态码更新为"queued"status
---- Table 定义:
ID INT PRIMARY KEY,
StatusCode INT,
StatusDescription VARCHAR(MAX),
IsInitial BIT,
LastUpdated DATETIME
4.) 前端向包含提交的 URL 的内部 ID
的 Azure 存储队列发送消息
第一条消息发送到队列后------------>
4.A)在UI中创建了一个对象供用户点击(到"refresh"数据)
4.B) 用户在创建的对象上点击(很有可能会发生)(如果经过验证则立即点击)
4.C) 另一条消息被发送到包含 URL 的 ID
的队列
<--------------------------------
5.) 一个 azure webjob(后台任务)运行 不断获取这些消息并开始处理
6.) 网络作业确定此 URL 是否已准备好进行处理
..... 准备开始处理 if
- 它是新的(LastUpdated 字段为空)
- 附加到项目的状态代码表示错误
- 距离上次更新已经 15 分钟了
..... 它退出如果
- 消息中的 ID 无效
- 附加的状态代码表示当前已经在处理中
- 距离上次更新不到 15 分钟
一旦确定可以继续...
- 如果是新的,webjob 会将 LastUpdated 更新为 Datetime.Now
- 在流程的每个步骤开始时,状态代码都会更新以反映这一点
在进程的最后,LastUpdated 更新为当前时间
一个try catch包围了进程
a.) 如果进程出错,状态代码会更新以反映
b.) 一条新消息被推入队列重试
判断URL是否准备好解析的函数:
private bool IsReadyToParse(int [ID])
{
using (var db = EntityFactory.GetInstance())
{
var item = db.ProcessStatus.FirstOrDefault(x => x.ID == [ID]);
if (item == null || item.StatusCode > 1)
{
return false;
}
if (item.StatusCode == (int)ProcessStatusEnum.Error || item.LastUpdated == null)
{
item.LastUpdated = DateTime.Now;
db.Entry(item).State = EntityState.Modified;
db.SaveChanges();
return true;
}
return ((DateTime)item.LastUpdated).AddMinutes(15) < DateTime.Now;
}
}
队列消息通过这个函数进入:
// This function will get triggered/executed when a new message is written
// on an Azure Queue
public static void ProcessQueueMessage([QueueTrigger("[queue]")] QueueItem item, TextWriter log)
{
Console.WriteLine("Item found! Starting services [Id: {0}]", item.ID);
Agent agent = new Agent([ID], log);
agent.StartProcessing();
log.WriteLine([Item]);
}
...现在的问题是这个连续 运行 webjob 一次可以接收多个消息(我想将其扩展到更多的 webjobs 从同一个队列读取道路)
如何确定函数 IsReadyToParse() 确实反映了当前的处理状态?
如果数据库正要将状态代码更新为 "in process",但另一个线程刚刚读取了状态代码并同意继续该过程怎么办?
这是我的基本解决方案,经过有限的开发人员测试...会随着我的进行而更新。
……
使用此存储过程而不是 IsReadyToParse()
CREATE PROCEDURE dbo.usp_getIsReadyForProcess
@[ID] INT
AS
BEGIN
BEGIN TRY
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE
BEGIN TRANSACTION
DECLARE @lastUpdated DATETIME
DECLARE @statusCode INT
-- LOCK ROW UNTIL END OF TRANSACTION
SET @lastUpdated = (SELECT LastUpdated FROM dbo.ProcessStatus WITH (ROWLOCK, HOLDLOCK) WHERE [ID] = @[ID])
SET @statusCode = (SELECT StatusCode FROM dbo.ProcessStatus WHERE [ID] = @[ID])
DECLARE @isReady BIT
--If there is no row count
IF @@ROWCOUNT = 0
BEGIN
SET @isReady = 0
END
-- If video is already in process
ELSE IF @statusCode > 1
BEGIN
SET @isReady = 0
END
-- If this is the first time it is getting parsed
ELSE IF @lastUpdated IS NULL
BEGIN
SET @isReady = 1
--Update datetime field
UPDATE dbo.ProcessStatus
SET LastUpdated = GETDATE()
WHERE [ID] = @[ID]
END
-- If is isnt the initial parse and hasnt been 15 minutes yet
ELSE IF GETDATE() < DATEADD(MINUTE, 15, @lastUpdated)
BEGIN
SET @isReady = 0
END
-- Anything else, and its a go
ELSE
BEGIN
SET @isReady = 1
END
-- If were ready to start, update the status code
IF @isReady = 1
BEGIN
UPDATE dbo.ProcessStatus
SET StatusCode = 2
WHERE [ID] = @[ID]
END
COMMIT TRANSACTION
SELECT @isReady
END TRY
BEGIN CATCH
-- If there was any type of error
ROLLBACK
SELECT 0
END CATCH
END
这是一种可能的方法,类似于 WebJobs SDK 在内部所做的,以防止多个 webjob 函数同时处理同一个 blob 触发器。
当函数从队列中获取消息时,创建一个与消息中的 ID 同名的 blob。 blob 的内容是处理的状态(完成或进行中)。当一个函数想要处理具有该 ID 的消息时,它必须租用该 blob - 这保证了线程安全。那么:
- 如果不能获取到least,其他人正在处理消息=>丢弃队列消息。
- 如果它获得了租约但状态是"Done",有人已经处理了消息=>丢弃队列消息。
- 如果它获得了租约并且状态为 "In progress",有人试图处理该消息但无法完成 => 使用该消息并再次处理。
如果处理一条消息可能需要超过 60 秒,您将需要一些额外的代码来续订 blob 租约,否则它会过期并且其他人可以接收它。
我当前的流程是运行,方式如下:
1.) 用户在前端app中输入一个URL进行分析
2.) 前端验证 URL 并在包含 URL
属性的 table 中创建 URL 的记录3.) 前端creates/updates table 中的一行跟踪URL 处于处理的哪个阶段(每个URL 都有其自己的内部 ID)
3.A)状态码更新为"queued"status
---- Table 定义:
ID INT PRIMARY KEY,
StatusCode INT,
StatusDescription VARCHAR(MAX),
IsInitial BIT,
LastUpdated DATETIME
4.) 前端向包含提交的 URL 的内部 ID
的 Azure 存储队列发送消息第一条消息发送到队列后------------>
4.A)在UI中创建了一个对象供用户点击(到"refresh"数据)
4.B) 用户在创建的对象上点击(很有可能会发生)(如果经过验证则立即点击)
4.C) 另一条消息被发送到包含 URL 的 ID
的队列<--------------------------------
5.) 一个 azure webjob(后台任务)运行 不断获取这些消息并开始处理
6.) 网络作业确定此 URL 是否已准备好进行处理
..... 准备开始处理 if
- 它是新的(LastUpdated 字段为空)
- 附加到项目的状态代码表示错误
- 距离上次更新已经 15 分钟了
..... 它退出如果
- 消息中的 ID 无效
- 附加的状态代码表示当前已经在处理中
- 距离上次更新不到 15 分钟
一旦确定可以继续...
- 如果是新的,webjob 会将 LastUpdated 更新为 Datetime.Now
- 在流程的每个步骤开始时,状态代码都会更新以反映这一点
在进程的最后,LastUpdated 更新为当前时间
一个try catch包围了进程
a.) 如果进程出错,状态代码会更新以反映
b.) 一条新消息被推入队列重试
判断URL是否准备好解析的函数:
private bool IsReadyToParse(int [ID])
{
using (var db = EntityFactory.GetInstance())
{
var item = db.ProcessStatus.FirstOrDefault(x => x.ID == [ID]);
if (item == null || item.StatusCode > 1)
{
return false;
}
if (item.StatusCode == (int)ProcessStatusEnum.Error || item.LastUpdated == null)
{
item.LastUpdated = DateTime.Now;
db.Entry(item).State = EntityState.Modified;
db.SaveChanges();
return true;
}
return ((DateTime)item.LastUpdated).AddMinutes(15) < DateTime.Now;
}
}
队列消息通过这个函数进入:
// This function will get triggered/executed when a new message is written
// on an Azure Queue
public static void ProcessQueueMessage([QueueTrigger("[queue]")] QueueItem item, TextWriter log)
{
Console.WriteLine("Item found! Starting services [Id: {0}]", item.ID);
Agent agent = new Agent([ID], log);
agent.StartProcessing();
log.WriteLine([Item]);
}
...现在的问题是这个连续 运行 webjob 一次可以接收多个消息(我想将其扩展到更多的 webjobs 从同一个队列读取道路)
如何确定函数 IsReadyToParse() 确实反映了当前的处理状态?
如果数据库正要将状态代码更新为 "in process",但另一个线程刚刚读取了状态代码并同意继续该过程怎么办?
这是我的基本解决方案,经过有限的开发人员测试...会随着我的进行而更新。
…… 使用此存储过程而不是 IsReadyToParse()
CREATE PROCEDURE dbo.usp_getIsReadyForProcess
@[ID] INT
AS
BEGIN
BEGIN TRY
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE
BEGIN TRANSACTION
DECLARE @lastUpdated DATETIME
DECLARE @statusCode INT
-- LOCK ROW UNTIL END OF TRANSACTION
SET @lastUpdated = (SELECT LastUpdated FROM dbo.ProcessStatus WITH (ROWLOCK, HOLDLOCK) WHERE [ID] = @[ID])
SET @statusCode = (SELECT StatusCode FROM dbo.ProcessStatus WHERE [ID] = @[ID])
DECLARE @isReady BIT
--If there is no row count
IF @@ROWCOUNT = 0
BEGIN
SET @isReady = 0
END
-- If video is already in process
ELSE IF @statusCode > 1
BEGIN
SET @isReady = 0
END
-- If this is the first time it is getting parsed
ELSE IF @lastUpdated IS NULL
BEGIN
SET @isReady = 1
--Update datetime field
UPDATE dbo.ProcessStatus
SET LastUpdated = GETDATE()
WHERE [ID] = @[ID]
END
-- If is isnt the initial parse and hasnt been 15 minutes yet
ELSE IF GETDATE() < DATEADD(MINUTE, 15, @lastUpdated)
BEGIN
SET @isReady = 0
END
-- Anything else, and its a go
ELSE
BEGIN
SET @isReady = 1
END
-- If were ready to start, update the status code
IF @isReady = 1
BEGIN
UPDATE dbo.ProcessStatus
SET StatusCode = 2
WHERE [ID] = @[ID]
END
COMMIT TRANSACTION
SELECT @isReady
END TRY
BEGIN CATCH
-- If there was any type of error
ROLLBACK
SELECT 0
END CATCH
END
这是一种可能的方法,类似于 WebJobs SDK 在内部所做的,以防止多个 webjob 函数同时处理同一个 blob 触发器。
当函数从队列中获取消息时,创建一个与消息中的 ID 同名的 blob。 blob 的内容是处理的状态(完成或进行中)。当一个函数想要处理具有该 ID 的消息时,它必须租用该 blob - 这保证了线程安全。那么:
- 如果不能获取到least,其他人正在处理消息=>丢弃队列消息。
- 如果它获得了租约但状态是"Done",有人已经处理了消息=>丢弃队列消息。
- 如果它获得了租约并且状态为 "In progress",有人试图处理该消息但无法完成 => 使用该消息并再次处理。
如果处理一条消息可能需要超过 60 秒,您将需要一些额外的代码来续订 blob 租约,否则它会过期并且其他人可以接收它。