尝试测试一个 FIFO 互斥锁——如果我在一个循环中启动测试线程,它就不起作用,但如果我以 1 毫秒的间隔启动它们,它就可以工作
Trying to test a FIFO Mutex - it doesn't work if I start the testing threads in one loop, but it does work if I start them 1 ms apart
我一直在使用来自this answer的排队锁码,并为它写了一个单元测试。
供参考,锁码:
public sealed class FifoMutex
{
private readonly object innerLock = new object();
private volatile int ticketsCount = 0;
private volatile int ticketToRide = 1;
private readonly ThreadLocal<int> reenter = new ThreadLocal<int>();
public void Enter()
{
reenter.Value++;
if (reenter.Value > 1)
return;
int myTicket = Interlocked.Increment(ref ticketsCount);
Monitor.Enter(innerLock);
while (true)
{
if (myTicket == ticketToRide)
{
return;
}
else
{
Monitor.Wait(innerLock);
}
}
}
public void Exit()
{
if (reenter.Value > 0)
reenter.Value--;
if (reenter.Value > 0)
return;
Interlocked.Increment(ref ticketToRide);
Monitor.PulseAll(innerLock);
Monitor.Exit(innerLock);
}
}
还有我的测试代码:
[TestClass]
public class FifoMutexTests
{
public static ConcurrentQueue<string> Queue;
[TestInitialize]
public void Setup()
{
Queue = new ConcurrentQueue<string>();
}
[TestCleanup]
public void TearDown()
{
Queue = null;
}
[TestMethod]
public void TestFifoMutex()
{
int noOfThreads = 10;
int[] threadSleepTimes = new int[noOfThreads];
string[] threadNames = new string[noOfThreads];
Random r = new Random();
for (int i = 0; i < noOfThreads; i++)
{
threadSleepTimes[i] = r.Next(0, 250);
threadNames[i] = "Thread " + i;
}
for (int i = 0; i < noOfThreads; i++)
{
FifoMutexTestUser user = new FifoMutexTestUser();
Thread newThread = new Thread(user.DoWork);
newThread.Name = threadNames[i];
newThread.Start(threadSleepTimes[i]);
}
Thread.Sleep(3000);
var receivedThreadNamesInOrder = Queue.ToArray();
Assert.AreEqual(threadNames.Length, receivedThreadNamesInOrder.Length);
for (int i = 0; i < receivedThreadNamesInOrder.Length; i++)
{
Assert.AreEqual(threadNames[i], receivedThreadNamesInOrder[i]);
}
}
}
使用此测试互斥用户:
public class FifoMutexTestUser
{
private readonly static FifoMutex fifoMutex = new FifoMutex();
public void DoWork(object sleepTime)
{
try
{
fifoMutex.Enter();
Thread.Sleep((int)sleepTime);
FifoMutexTests.Queue.Enqueue(Thread.CurrentThread.Name);
}
finally
{
fifoMutex.Exit();
}
}
}
本质上,我创建了十个线程,每个线程都会随机休眠一段时间,然后在主测试中将它们的名字排入静态并发队列class。这些线程是从同一用户 class 的不同实例构建的,它具有静态 fifo 互斥体 属性。这个场景类似于我自己的用例(我有多个消费者 classes 从不同的地方接收消息,我需要我的后端严格按顺序处理它们,但也严格按照它们到达的顺序)。
但是这个测试不起作用。所有线程都将其所有名称排入队列,但顺序不正确。从第二个片段的最后一个 for 循环中,我读到它们实际上是按随机顺序执行的,这正是 fifo 互斥锁要防止的。
但事情是这样的。对我的测试代码进行了一个小的调整,一切都很顺利。
for (int i = 0; i < noOfThreads; i++)
{
FifoMutexTestUser user = new FifoMutexTestUser();
Thread newThread = new Thread(user.DoWork);
Thread.Sleep(1);
newThread.Name = threadNames[i];
newThread.Start(threadSleepTimes[i]);
}
现在我在启动所有线程的循环中休眠一毫秒,这是可能的最小间隔(第二个片段的第二个循环)。如果我这样做,那么我所有的线程都会以正确的顺序排列它们的名字,并且我的测试 100% 成功。
所以我想知道为什么这么短的睡眠时间会有所不同。我对编译不是很了解,但我的第一个猜测是编译器正在编译或优化启动所有线程的循环,并且在该过程中线程的顺序发生变化?
或者,(可能更有可能的)替代方案是我的测试代码(或互斥锁代码)有问题吗?
似乎(如果我理解正确的话)您假设线程将实际启动(DoWork
将在这种情况下执行)并按照您在其上调用 Thread.Start
的相同顺序获取互斥量。然而,这不是(必要的)情况。
假设您有 10 个线程(“ids”从 1 到 10),然后您按顺序对它们调用 Thead.Start
- 不 意味着它们实际上将按该顺序开始。您在线程 1 上调用 start,然后在线程 2 上调用 start,然后线程 2(不是 1)的 DoWork
可能先执行。您可以通过以这种方式更改测试代码来观察这一点:
public class FifoMutexTestUser {
private readonly int _id;
public FifoMutexTestUser(int id) {
_id = id;
}
private readonly static FifoMutex fifoMutex = new FifoMutex();
public void DoWork(object sleepTime)
{
Console.WriteLine("Thread started: " + _id);
try
{
fifoMutex.Enter();
Thread.Sleep((int)sleepTime);
FifoMutexTests.Queue.Enqueue(Thread.CurrentThread.Name);
}
finally
{
fifoMutex.Exit();
}
}
}
然后在那里传递循环变量(与您执行断言的 threadNames
相关):
for (int i = 0; i < noOfThreads; i++)
{
FifoMutexTestUser user = new FifoMutexTestUser(i);
Thread newThread = new Thread(user.DoWork);
newThread.Name = threadNames[i];
newThread.Start(threadSleepTimes[i]);
}
你可以看到这样的东西(当然结果可能会有所不同):
Thread started: 9
Thread started: 1
Thread started: 0
Thread started: 2
Thread started: 3
Thread started: 4
Thread started: 5
Thread started: 6
Thread started: 7
Thread started: 8
因此在此 运行 中,您最后调用 Thread.Start
的线程实际上最先启动。但更重要的是 - 如果线程首先启动(这里启动我们的意思是 DoWork
开始执行) - 这并不意味着它会首先获取互斥锁,因为线程并行执行并且代码在 fifoMutex.Enter
之外并且 fifoMutex.Exit
(以及在互斥量之前和之后的函数内部实际上是 acquired\released)不受任何同步结构的保护 - 任何线程都可以首先获取互斥量。
有时(并非总是)添加延迟会给前一个(在循环中)线程带来优势,因此它有更多机会首先实际获取互斥锁。如果你很幸运,线程试图以正确的顺序获取互斥量,那么你的 FifoMutex
确保它们将按该顺序解除阻塞。但是他们获取互斥锁的顺序在你的测试代码中是不确定的。
我一直在使用来自this answer的排队锁码,并为它写了一个单元测试。 供参考,锁码:
public sealed class FifoMutex
{
private readonly object innerLock = new object();
private volatile int ticketsCount = 0;
private volatile int ticketToRide = 1;
private readonly ThreadLocal<int> reenter = new ThreadLocal<int>();
public void Enter()
{
reenter.Value++;
if (reenter.Value > 1)
return;
int myTicket = Interlocked.Increment(ref ticketsCount);
Monitor.Enter(innerLock);
while (true)
{
if (myTicket == ticketToRide)
{
return;
}
else
{
Monitor.Wait(innerLock);
}
}
}
public void Exit()
{
if (reenter.Value > 0)
reenter.Value--;
if (reenter.Value > 0)
return;
Interlocked.Increment(ref ticketToRide);
Monitor.PulseAll(innerLock);
Monitor.Exit(innerLock);
}
}
还有我的测试代码:
[TestClass]
public class FifoMutexTests
{
public static ConcurrentQueue<string> Queue;
[TestInitialize]
public void Setup()
{
Queue = new ConcurrentQueue<string>();
}
[TestCleanup]
public void TearDown()
{
Queue = null;
}
[TestMethod]
public void TestFifoMutex()
{
int noOfThreads = 10;
int[] threadSleepTimes = new int[noOfThreads];
string[] threadNames = new string[noOfThreads];
Random r = new Random();
for (int i = 0; i < noOfThreads; i++)
{
threadSleepTimes[i] = r.Next(0, 250);
threadNames[i] = "Thread " + i;
}
for (int i = 0; i < noOfThreads; i++)
{
FifoMutexTestUser user = new FifoMutexTestUser();
Thread newThread = new Thread(user.DoWork);
newThread.Name = threadNames[i];
newThread.Start(threadSleepTimes[i]);
}
Thread.Sleep(3000);
var receivedThreadNamesInOrder = Queue.ToArray();
Assert.AreEqual(threadNames.Length, receivedThreadNamesInOrder.Length);
for (int i = 0; i < receivedThreadNamesInOrder.Length; i++)
{
Assert.AreEqual(threadNames[i], receivedThreadNamesInOrder[i]);
}
}
}
使用此测试互斥用户:
public class FifoMutexTestUser
{
private readonly static FifoMutex fifoMutex = new FifoMutex();
public void DoWork(object sleepTime)
{
try
{
fifoMutex.Enter();
Thread.Sleep((int)sleepTime);
FifoMutexTests.Queue.Enqueue(Thread.CurrentThread.Name);
}
finally
{
fifoMutex.Exit();
}
}
}
本质上,我创建了十个线程,每个线程都会随机休眠一段时间,然后在主测试中将它们的名字排入静态并发队列class。这些线程是从同一用户 class 的不同实例构建的,它具有静态 fifo 互斥体 属性。这个场景类似于我自己的用例(我有多个消费者 classes 从不同的地方接收消息,我需要我的后端严格按顺序处理它们,但也严格按照它们到达的顺序)。
但是这个测试不起作用。所有线程都将其所有名称排入队列,但顺序不正确。从第二个片段的最后一个 for 循环中,我读到它们实际上是按随机顺序执行的,这正是 fifo 互斥锁要防止的。
但事情是这样的。对我的测试代码进行了一个小的调整,一切都很顺利。
for (int i = 0; i < noOfThreads; i++)
{
FifoMutexTestUser user = new FifoMutexTestUser();
Thread newThread = new Thread(user.DoWork);
Thread.Sleep(1);
newThread.Name = threadNames[i];
newThread.Start(threadSleepTimes[i]);
}
现在我在启动所有线程的循环中休眠一毫秒,这是可能的最小间隔(第二个片段的第二个循环)。如果我这样做,那么我所有的线程都会以正确的顺序排列它们的名字,并且我的测试 100% 成功。
所以我想知道为什么这么短的睡眠时间会有所不同。我对编译不是很了解,但我的第一个猜测是编译器正在编译或优化启动所有线程的循环,并且在该过程中线程的顺序发生变化?
或者,(可能更有可能的)替代方案是我的测试代码(或互斥锁代码)有问题吗?
似乎(如果我理解正确的话)您假设线程将实际启动(DoWork
将在这种情况下执行)并按照您在其上调用 Thread.Start
的相同顺序获取互斥量。然而,这不是(必要的)情况。
假设您有 10 个线程(“ids”从 1 到 10),然后您按顺序对它们调用 Thead.Start
- 不 意味着它们实际上将按该顺序开始。您在线程 1 上调用 start,然后在线程 2 上调用 start,然后线程 2(不是 1)的 DoWork
可能先执行。您可以通过以这种方式更改测试代码来观察这一点:
public class FifoMutexTestUser {
private readonly int _id;
public FifoMutexTestUser(int id) {
_id = id;
}
private readonly static FifoMutex fifoMutex = new FifoMutex();
public void DoWork(object sleepTime)
{
Console.WriteLine("Thread started: " + _id);
try
{
fifoMutex.Enter();
Thread.Sleep((int)sleepTime);
FifoMutexTests.Queue.Enqueue(Thread.CurrentThread.Name);
}
finally
{
fifoMutex.Exit();
}
}
}
然后在那里传递循环变量(与您执行断言的 threadNames
相关):
for (int i = 0; i < noOfThreads; i++)
{
FifoMutexTestUser user = new FifoMutexTestUser(i);
Thread newThread = new Thread(user.DoWork);
newThread.Name = threadNames[i];
newThread.Start(threadSleepTimes[i]);
}
你可以看到这样的东西(当然结果可能会有所不同):
Thread started: 9
Thread started: 1
Thread started: 0
Thread started: 2
Thread started: 3
Thread started: 4
Thread started: 5
Thread started: 6
Thread started: 7
Thread started: 8
因此在此 运行 中,您最后调用 Thread.Start
的线程实际上最先启动。但更重要的是 - 如果线程首先启动(这里启动我们的意思是 DoWork
开始执行) - 这并不意味着它会首先获取互斥锁,因为线程并行执行并且代码在 fifoMutex.Enter
之外并且 fifoMutex.Exit
(以及在互斥量之前和之后的函数内部实际上是 acquired\released)不受任何同步结构的保护 - 任何线程都可以首先获取互斥量。
有时(并非总是)添加延迟会给前一个(在循环中)线程带来优势,因此它有更多机会首先实际获取互斥锁。如果你很幸运,线程试图以正确的顺序获取互斥量,那么你的 FifoMutex
确保它们将按该顺序解除阻塞。但是他们获取互斥锁的顺序在你的测试代码中是不确定的。