尝试在自定义消息传递系统(队列)中锁定记录时遇到并发问题

Having problems with concurrency while trying to lock a record in custom messaging system (Queue)

我们正在构建自己的自定义消息传递系统,但存在并发问题。规则如下:

  1. 进程 (EXE) 控制台应用程序锁定了 3 条记录并且 returns 它们

  2. 没有其他正在运行的进程(我们有 5 个 EXE 正在运行)可以选择其他进程已经占用的任何记录。

就这么简单,但是,我还是很疑惑。

SQL SPROC 执行 "Lock And Peek" 的摘要:

这背后的想法是我们保留三个 "NEW" 记录并将它们的状态更改为 "IN PROGRESS",并在 SELECT 和 UPDATE 语句上使用 ROWLOCK。所以理论上这些记录应该为一个进程锁定,这样其他进程就不能更新甚至 select 它们。有人可以告诉我我做错了什么吗?

ALTER PROCEDURE [dbo].[LockAndPeek] 
    @Count INT,
    @QueueTypeId INT
AS
BEGIN
    SET NOCOUNT ON;

    DECLARE @ListofIDs TABLE(ID INT);
    DECLARE @StatusIDInProgress INT

    SELECT @StatusIDInProgress = ID FROM QueueStatuses (NOLOCK)
    WHERE Name = 'In Progress'

    INSERT INTO @ListofIDs
    (ID)
    SELECT TOP (@Count) Q.ID 
    FROM
        Queues Q (ROWLOCK) INNER JOIN
        QueueStatuses QS (ROWLOCK) ON Q.StatusID = QS.ID
    WHERE
        QS.Name IN ('New', 'Errored') AND
        Q.TypeID = @QueueTypeID AND
        Q.AvailableTime IS NOT NULL AND
        Q.AvailableTime <= GETUTCDATE()
    ORDER BY Q.ID

    UPDATE Q WITH (ROWLOCK)
    SET 
        STATUSID = @StatusIDInProgress,
        PROCESSED = GETUTCDATE()
    FROM
        Queues Q (ROWLOCK) INNER JOIN
        QueueStatuses QS (ROWLOCK) ON Q.StatusID = QS.ID INNER JOIN
        @ListofIDs LI ON Q.ID = LI.ID
    WHERE
        QS.Name IN ('New', 'Errored')

    SELECT  Q.ID, 
            Q.AvailableTime,
            Q.NumberOfTries,
            Q.Created,
            Q.Processed,
            Q.ErrorData,
            QT.ID QueueTypeID,
            QT.Name QueueTypeName,
            QS.ID QueueStatusID,
            QS.Name QueueStatusName,
            Q.Message
    FROM 
        Queues Q (NOLOCK) INNER JOIN
        QueueStatuses QS (NOLOCK) ON Q.StatusID = QS.ID INNER JOIN
        QueueTypes QT (NOLOCK) ON Q.TypeId = QT.ID INNER JOIN
        @ListofIDs LI ON Q.ID = LI.ID

END

找到了!感谢@MaxVernon!

http://rusanu.com/2010/03/26/using-tables-as-queues/ 基本上是@MaxVernon 发表的评论的变体。我的查询看起来像这样,比以前简单得多:

ALTER PROCEDURE [dbo].[LockAndPeek] 
    @Count INT,
    @QueueTypeId INT
AS
BEGIN

SET XACT_ABORT ON; -- blow up the whole tran on any errors
SET TRANSACTION ISOLATION LEVEL READ COMMITTED;
BEGIN TRAN

UPDATE Q
SET 
    StatusID = 2, -- In Progress
    Processed = GETUTCDATE()
OUTPUT Inserted.*
FROM (
    SELECT TOP (@Count) * 
    FROM
        Queues WITH (READPAST, ROWLOCK)
    WHERE
        StatusID = 1 AND -- New 
        TypeID = @QueueTypeID AND
        AvailableTime IS NOT NULL AND
        AvailableTime <= GETUTCDATE()
   ORDER BY ID
) Q;

COMMIT TRAN;

END