按顺序执行锁(使用ConcurrentQueue和锁)

Execute locks in order (using ConcurrentQueue and lock)

我在 API 中有一个处理数据并使用锁的端点。由于有很多用户使用系统,有时请求没有按顺序处理,如果有很多请求等待,可能会抛出超时。我们已经尝试了两种不同的方法,第一种是 class:

public class QueuedFunctions<T>
    {
        private readonly object _internalSyncronizer = new object();
        private readonly ConcurrentQueue<Func<T>> _funcQueue = new ConcurrentQueue<Func<T>>();

        public T Execute(Func<T> func)
        {
            _funcQueue.Enqueue(func);
            Console.WriteLine("Queuing: " + Thread.CurrentThread.ManagedThreadId);
            Console.WriteLine(Monitor.IsEntered(_internalSyncronizer));
            lock (_internalSyncronizer)
            {
                Console.WriteLine("Locked: " + Thread.CurrentThread.ManagedThreadId);
                Func<T> nextFunc;
                if (_funcQueue.TryDequeue(out nextFunc))
                {
                    return nextFunc();
                }
                else
                {
                    throw new Exception("Something is wrong. How come there is nothing in the queue?");
                }
            }
        }
    }

我用以下单元测试对此进行了测试:

var threadsQuantity = 50;

            for (int i = 0; i < threadsQuantity; i++)
            {
                var t = new Thread(async () =>
                {
                    Console.WriteLine("Started Thread: " + Thread.CurrentThread.ManagedThreadId);
                    await queuedActions.Execute(() =>
                    {
                        Console.WriteLine("Thread: " + Thread.CurrentThread.ManagedThreadId);
                        return Task.FromResult<object>(null);
                    });
                });

                t.Start();
            }

输出如下

Started Thread: 16
Started Thread: 13
Started Thread: 15
Started Thread: 17
Started Thread: 14
Started Thread: 18
Started Thread: 19
Started Thread: 20
Started Thread: 21
Queuing: 21
Locked: 21
Queuing: 13
Queuing: 18
Queuing: 15
Queuing: 16
Queuing: 17
Queuing: 14
Queuing: 19
Queuing: 20
Started Thread: 22
Queuing: 22
Thread: 21
Locked: 13
Thread: 13
Locked: 15
Started Thread: 23
Thread: 15
Queuing: 23
Locked: 23
Thread: 23
Locked: 18
Thread: 18
Locked: 17
Thread: 17
Locked: 16
Thread: 16
Locked: 14
Thread: 14
Locked: 19
Thread: 19
Locked: 20
Thread: 20
Locked: 22
Thread: 22
Started Thread: 24
Queuing: 24
Locked: 24
Thread: 24
Started Thread: 25
Queuing: 25
Locked: 25
Thread: 25
Started Thread: 26
Queuing: 26
Locked: 26
Thread: 26
Started Thread: 27
Queuing: 27
Locked: 27
Thread: 27
Started Thread: 28
Queuing: 28
Locked: 28
Thread: 28
Started Thread: 29
Queuing: 29
Locked: 29
Thread: 29
Started Thread: 30
Queuing: 30
Locked: 30
Thread: 30
Started Thread: 31
Queuing: 31
Locked: 31
Thread: 31
Started Thread: 32
Queuing: 32
Locked: 32
Thread: 32
Started Thread: 33
Queuing: 33
Locked: 33
Thread: 33
Started Thread: 34
Queuing: 34
Locked: 34
Thread: 34
Started Thread: 35
Queuing: 35
Locked: 35
Thread: 35
Started Thread: 36
Queuing: 36
Locked: 36
Thread: 36
Started Thread: 37
Queuing: 37
Locked: 37
Thread: 37
Started Thread: 38
Queuing: 38
Locked: 38
Thread: 38
Started Thread: 39
Queuing: 39
Locked: 39
Thread: 39
Started Thread: 40
Queuing: 40
Locked: 40
Thread: 40
Started Thread: 41
Queuing: 41
Locked: 41
Thread: 41
Started Thread: 42
Queuing: 42
Locked: 42
Thread: 42
Started Thread: 43
Queuing: 43
Locked: 43
Thread: 43
Started Thread: 44
Queuing: 44
Locked: 44
Thread: 44
Started Thread: 45
Queuing: 45
Locked: 45
Thread: 45
Started Thread: 46
Queuing: 46
Locked: 46
Thread: 46
Started Thread: 47
Queuing: 47
Locked: 47
Thread: 47
Started Thread: 48
Queuing: 48
Locked: 48
Thread: 48
Started Thread: 49
Queuing: 49
Locked: 49
Thread: 49
Started Thread: 50
Queuing: 50
Locked: 50
Thread: 50
Started Thread: 51
Queuing: 51
Locked: 51
Thread: 51
Started Thread: 52
Queuing: 52
Locked: 52
Thread: 52
Started Thread: 53
Queuing: 53
Locked: 53
Thread: 53
Started Thread: 54
Queuing: 54
Locked: 54
Thread: 54
Started Thread: 55
Queuing: 55
Locked: 55
Thread: 55
Started Thread: 56
Queuing: 56
Locked: 56
Thread: 56
Started Thread: 57
Queuing: 57
Locked: 57
Thread: 57
Started Thread: 58
Queuing: 58
Locked: 58
Thread: 58
Started Thread: 59
Queuing: 59
Locked: 59
Thread: 59
Started Thread: 60
Queuing: 60
Locked: 60
Thread: 60
Started Thread: 61
Queuing: 61
Locked: 61
Thread: 61
Started Thread: 62
Queuing: 62
Locked: 62
Thread: 62

如您所见,代码将线程 21 排队,锁定线程 21,然后将线程 13 排队,锁定线程 13,将线程 18 排队,然后锁定线程 15,这是下一个线程过程。

此外,我们还用在这个论坛上找到的另一个 class 进行了测试:

   public sealed class QueuedLock
{
    private class SyncObject : IDisposable
    {
        private Action m_action = null;
        private static readonly object locker = new object();

        public SyncObject(Action action)
        {
            m_action = action;
        }

        public void Dispose()
        {
            lock (locker)
            {
                var action = m_action;
                m_action = null;
                action?.Invoke();
            }
        }
    }

    private static readonly object m_innerLock = new Object();
    private volatile int m_ticketsCount = 0;
    private volatile int m_ticketToRide = 1;

    public bool Enter()
    {
        if (Monitor.IsEntered(m_innerLock))
        {
            return false;
        }

        var myTicket = Interlocked.Increment(ref m_ticketsCount);
        Monitor.Enter(m_innerLock);
        Console.WriteLine("Locked: " + Thread.CurrentThread.ManagedThreadId);
        while (true)
        {
            if (myTicket == m_ticketToRide)
            {
                return true;
            }

            Monitor.Wait(m_innerLock);
        }
    }

    public void Exit()
    {
        Interlocked.Increment(ref m_ticketToRide);
        Monitor.PulseAll(m_innerLock);
        Monitor.Exit(m_innerLock);
    }

    public IDisposable GetLock()
    {
        if (Enter())
        {
            return new SyncObject(Exit);
        }

        return new SyncObject(null);
    }
}

测试方法:

static QueuedLock queuedLock = new QueuedLock();
        [TestMethod]
        public void QueuedLockTest_ShouldRunInOrder()
        {
            var threadsQuantity = 50;

            for (int i = 0; i < threadsQuantity; i++)
            {
                var t = new Thread(() =>
                {
                    Console.WriteLine("Started Thread: " + Thread.CurrentThread.ManagedThreadId);
                    try
                    {
                        queuedLock.Enter();
                        Console.WriteLine("Thread: " + Thread.CurrentThread.ManagedThreadId);
                    }
                    finally
                    {
                        queuedLock.Exit();
                    }
                });

                t.Start();
            }
        }

而且输出也没有排序:

    Started Thread: 15
Started Thread: 14
Started Thread: 13
Started Thread: 16
Started Thread: 17
Started Thread: 18
Locked: 18
Started Thread: 19
Thread: 18
Locked: 15
Thread: 15
Locked: 14
Locked: 13
Locked: 19
Thread: 19
Locked: 16
Thread: 14
Locked: 17
Thread: 13
Thread: 16
Thread: 17
Started Thread: 20
Locked: 20
Thread: 20
Started Thread: 21
Locked: 21
Thread: 21
Started Thread: 22
Locked: 22
Thread: 22
Started Thread: 23
Locked: 23
Thread: 23
Started Thread: 24
Locked: 24
Thread: 24
Started Thread: 25
Locked: 25
Thread: 25
Started Thread: 26
Locked: 26
Thread: 26
Started Thread: 27
Locked: 27
Thread: 27
Started Thread: 28
Locked: 28
Thread: 28
Started Thread: 29
Locked: 29
Thread: 29
Started Thread: 30
Locked: 30
Thread: 30
Started Thread: 31
Locked: 31
Thread: 31
Started Thread: 32
Locked: 32
Thread: 32
Started Thread: 33
Locked: 33
Thread: 33
Started Thread: 34
Locked: 34
Thread: 34
Started Thread: 35
Locked: 35
Thread: 35
Started Thread: 36
Locked: 36
Thread: 36
Started Thread: 37
Locked: 37
Thread: 37
Started Thread: 38
Locked: 38
Thread: 38
Started Thread: 39
Locked: 39
Thread: 39
Started Thread: 40
Locked: 40
Thread: 40
Started Thread: 41
Locked: 41
Thread: 41
Started Thread: 42
Locked: 42
Thread: 42
Started Thread: 43
Locked: 43
Thread: 43
Started Thread: 44
Locked: 44
Thread: 44
Started Thread: 45
Locked: 45
Thread: 45
Started Thread: 46
Locked: 46
Thread: 46
Started Thread: 47
Locked: 47
Thread: 47
Started Thread: 48
Locked: 48
Thread: 48
Started Thread: 49
Locked: 49
Thread: 49
Started Thread: 50
Locked: 50
Thread: 50
Started Thread: 51
Locked: 51
Thread: 51
Started Thread: 52
Locked: 52
Thread: 52
Started Thread: 53
Locked: 53
Thread: 53
Started Thread: 54
Locked: 54
Thread: 54
Started Thread: 55
Locked: 55
Thread: 55
Started Thread: 56
Locked: 56
Thread: 56
Started Thread: 57
Locked: 57
Thread: 57
Started Thread: 58
Locked: 58
Thread: 58
Started Thread: 59
Locked: 59
Thread: 59
Started Thread: 60
Locked: 60
Thread: 60
Started Thread: 61
Locked: 61
Thread: 61
Started Thread: 62
Locked: 62
Thread: 62

你能帮我让它工作吗?我无法在这些函数中发现错误,而且它似乎对其他人也有效。

您的测试似乎没有生成可用于证明或反驳正确执行顺序的信息。 Thread.CurrentThread.ManagedThreadId 只是一个 ID,并不是可靠的顺序指示。在程序 运行 时写入 Console 可能会影响执行顺序,因为对静态 Console class 的所有访问都是同步的。

下面的测试使用自定义 QueuedLock class(已修改)来确保执行顺序,并一致地报告 FIFO 行为。我在启动每个线程之前添加了一个 Thread.Sleep(20) ,以避免两个线程大致同时请求票证。 ThreadStart 委托中还有一个 Thread.Sleep(40),以确保所有线程在进入 QueuedLock 之前都会被阻塞(每个批次的第一个线程除外)。

public static void Main()
{
    QueuedLock queuedLock = new QueuedLock();
    foreach (var batch in Enumerable.Range(1, 10))
    {
        var sequence = Enumerable.Range(1, 10);
        var tickets = new List<int>();
        var threads = sequence.Select(item =>
        {
            Thread.Sleep(20);
            var thread = new Thread(() =>
            {
                queuedLock.Enter();
                try { tickets.Add(item); Thread.Sleep(40); }
                finally { queuedLock.Exit(); }
            });
            thread.Start();
            return thread;
        }).ToArray();
        foreach (var thread in threads) thread.Join();
        Console.WriteLine($"Batch #{batch}, tickets: {String.Join(", ", tickets)} - "
            + (tickets.SequenceEqual(sequence) ? "OK" : "NOT FIFO!"));
    }
}

public class QueuedLock
{
    private readonly object _locker = new object();
    private int _ticketsCount = 0;
    private int _ticketToRide = 1;

    public void Enter()
    {
        if (Monitor.IsEntered(_locker)) throw new InvalidOperationException();
        int myTicket = Interlocked.Increment(ref _ticketsCount);
        Monitor.Enter(_locker);
        while (myTicket != _ticketToRide) Monitor.Wait(_locker);
    }

    public void Exit()
    {
        if (!Monitor.IsEntered(_locker)) throw new InvalidOperationException();
        _ticketToRide++;
        Monitor.PulseAll(_locker);
        Monitor.Exit(_locker);
    }
}

输出:

Batch #1, tickets: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 - OK
Batch #2, tickets: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 - OK
Batch #3, tickets: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 - OK
Batch #4, tickets: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 - OK
Batch #5, tickets: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 - OK
Batch #6, tickets: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 - OK
Batch #7, tickets: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 - OK
Batch #8, tickets: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 - OK
Batch #9, tickets: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 - OK
Batch #10, tickets: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 - OK

Try it on Fiddle.