尝试测试一个 FIFO 互斥锁——如果我在一个循环中启动测试线程,它就不起作用,但如果我以 1 毫秒的间隔启动它们,它就可以工作

Trying to test a FIFO Mutex - it doesn't work if I start the testing threads in one loop, but it does work if I start them 1 ms apart

我一直在使用来自this answer的排队锁码,并为它写了一个单元测试。 供参考,锁码:

public sealed class FifoMutex
{
    private readonly object innerLock = new object();
    private volatile int ticketsCount = 0;
    private volatile int ticketToRide = 1;
    private readonly ThreadLocal<int> reenter = new ThreadLocal<int>();

    public void Enter()
    {
        reenter.Value++;
        if (reenter.Value > 1)
            return;
        int myTicket = Interlocked.Increment(ref ticketsCount);
        Monitor.Enter(innerLock);
        while (true)
        {
            if (myTicket == ticketToRide)
            {
                return;
            }
            else
            {
                Monitor.Wait(innerLock);
            }
        }
    }

    public void Exit()
    {
        if (reenter.Value > 0)
            reenter.Value--;
        if (reenter.Value > 0)
            return;
        Interlocked.Increment(ref ticketToRide);
        Monitor.PulseAll(innerLock);
        Monitor.Exit(innerLock);
    }
}

还有我的测试代码:

[TestClass]
public class FifoMutexTests
{
    public static ConcurrentQueue<string> Queue;

    [TestInitialize]
    public void Setup()
    {
        Queue = new ConcurrentQueue<string>();
    }

    [TestCleanup]
    public void TearDown()
    {
        Queue = null;
    }

    [TestMethod]
    public void TestFifoMutex()
    {
        int noOfThreads = 10;
        int[] threadSleepTimes = new int[noOfThreads];
        string[] threadNames = new string[noOfThreads];
        Random r = new Random();
        for (int i = 0; i < noOfThreads; i++)
        {
            threadSleepTimes[i] = r.Next(0, 250);
            threadNames[i] = "Thread " + i;
        }

        for (int i = 0; i < noOfThreads; i++)
        {
            FifoMutexTestUser user = new FifoMutexTestUser();
            Thread newThread = new Thread(user.DoWork);
            newThread.Name = threadNames[i];
            newThread.Start(threadSleepTimes[i]);
        }
        Thread.Sleep(3000);

        var receivedThreadNamesInOrder = Queue.ToArray();
        Assert.AreEqual(threadNames.Length, receivedThreadNamesInOrder.Length);
        for (int i = 0; i < receivedThreadNamesInOrder.Length; i++)
        {
            Assert.AreEqual(threadNames[i], receivedThreadNamesInOrder[i]);
        }
    }
}

使用此测试互斥用户:

public class FifoMutexTestUser
{
    private readonly static FifoMutex fifoMutex = new FifoMutex();

    public void DoWork(object sleepTime)
    {
        try
        {
            fifoMutex.Enter();
            Thread.Sleep((int)sleepTime);
            FifoMutexTests.Queue.Enqueue(Thread.CurrentThread.Name);
        }
        finally
        {
            fifoMutex.Exit();
        }
    }
}

本质上,我创建了十个线程,每个线程都会随机休眠一段时间,然后在主测试中将它们的名字排入静态并发队列class。这些线程是从同一用户 class 的不同实例构建的,它具有静态 fifo 互斥体 属性。这个场景类似于我自己的用例(我有多个消费者 classes 从不同的地方接收消息,我需要我的后端严格按顺序处理它们,但也严格按照它们到达的顺序)。

但是这个测试不起作用。所有线程都将其所有名称排入队列,但顺序不正确。从第二个片段的最后一个 for 循环中,我读到它们实际上是按随机顺序执行的,这正是 fifo 互斥锁要防止的。

但事情是这样的。对我的测试代码进行了一个小的调整,一切都很顺利。

        for (int i = 0; i < noOfThreads; i++)
        {
            FifoMutexTestUser user = new FifoMutexTestUser();
            Thread newThread = new Thread(user.DoWork);
            Thread.Sleep(1);
            newThread.Name = threadNames[i];
            newThread.Start(threadSleepTimes[i]);
        }

现在我在启动所有线程的循环中休眠一毫秒,这是可能的最小间隔(第二个片段的第二个循环)。如果我这样做,那么我所有的线程都会以正确的顺序排列它们的名字,并且我的测试 100% 成功。

所以我想知道为什么这么短的睡眠时间会有所不同。我对编译不是很了解,但我的第一个猜测是编译器正在编译或优化启动所有线程的循环,并且在该过程中线程的顺序发生变化?

或者,(可能更有可能的)替代方案是我的测试代码(或互斥锁代码)有问题吗?

似乎(如果我理解正确的话)您假设线程将实际启动(DoWork 将在这种情况下执行)并按照您在其上调用 Thread.Start 的相同顺序获取互斥量。然而,这不是(必要的)情况。

假设您有 10 个线程(“ids”从 1 到 10),然后您按顺序对它们调用 Thead.Start - 意味着它们实际上将按该顺序开始。您在线程 1 上调用 start,然后在线程 2 上调用 start,然后线程 2(不是 1)的 DoWork 可能先执行。您可以通过以这种方式更改测试代码来观察这一点:

public class FifoMutexTestUser {
    private readonly int _id;
    public FifoMutexTestUser(int id) {
        _id = id;
    }
    private readonly static FifoMutex fifoMutex = new FifoMutex();

    public void DoWork(object sleepTime)
    {
        Console.WriteLine("Thread started: " + _id);
        try
        {
            fifoMutex.Enter();
            Thread.Sleep((int)sleepTime);
            FifoMutexTests.Queue.Enqueue(Thread.CurrentThread.Name);
        }
        finally
        {
            fifoMutex.Exit();
        }
    }
}

然后在那里传递循环变量(与您执行断言的 threadNames 相关):

for (int i = 0; i < noOfThreads; i++)
{
    FifoMutexTestUser user = new FifoMutexTestUser(i);
    Thread newThread = new Thread(user.DoWork);
    newThread.Name = threadNames[i];
    newThread.Start(threadSleepTimes[i]);
}

你可以看到这样的东西(当然结果可能会有所不同):

Thread started: 9
Thread started: 1
Thread started: 0
Thread started: 2
Thread started: 3
Thread started: 4
Thread started: 5
Thread started: 6
Thread started: 7
Thread started: 8

因此在此 运行 中,您最后调用 Thread.Start 的线程实际上最先启动。但更重要的是 - 如果线程首先启动(这里启动我们的意思是 DoWork 开始执行) - 这并不意味着它会首先获取互斥锁,因为线程并行执行并且代码在 fifoMutex.Enter 之外并且 fifoMutex.Exit(以及在互斥量之前和之后的函数内部实际上是 acquired\released)不受任何同步结构的保护 - 任何线程都可以首先获取互斥量。

有时(并非总是)添加延迟会给前一个(在循环中)线程带来优势,因此它有更多机会首先实际获取互斥锁。如果你很幸运,线程试图以正确的顺序获取互斥量,那么你的 FifoMutex 确保它们将按该顺序解除阻塞。但是他们获取互斥锁的顺序在你的测试代码中是不确定的。