使用信号量的 C# 生产者-消费者
C# Producer-Consumer using Semaphores
受到 The Little Book of Semaphores 的启发,我决定使用 Semaphores 来实现生产者-消费者问题。
我特别希望能够随意停止所有的Worker线程。
我已经广泛测试了我的方法论,没有发现任何错误。
以下代码是用于测试的原型,可以运行作为控制台应用程序:
using System;
using System.Collections.Concurrent;
using System.Threading;
using NUnit.Framework;
public class ProducerConsumer
{
private static readonly int _numThreads = 5;
private static readonly int _numItemsEnqueued = 10;
private static readonly Semaphore _workItems = new Semaphore(0, int.MaxValue);
private static readonly ManualResetEvent _stop = new ManualResetEvent(false);
private static ConcurrentQueue<int> _queue;
public static void Main()
{
_queue = new ConcurrentQueue<int>();
// Create and start threads.
for (int i = 1; i <= _numThreads; i++)
{
Thread t = new Thread(new ParameterizedThreadStart(Worker));
// Start the thread, passing the number.
t.Start(i);
}
// Wait for half a second, to allow all the
// threads to start and to block on the semaphore.
Thread.Sleep(500);
Console.WriteLine(string.Format("Main thread adds {0} items to the queue and calls Release() {0} times.", _numItemsEnqueued));
for (int i = 1; i <= _numItemsEnqueued; i++)
{
Console.WriteLine("Waking up a worker thread.");
_queue.Enqueue(i);
_workItems.Release(); //wake up 1 worker
Thread.Sleep(2000); //sleep 2 sec so it's clear the threads get unblocked 1 by 1
}
// sleep for 5 seconds to allow threads to exit
Thread.Sleep(5000);
Assert.True(_queue.Count == 0);
Console.WriteLine("Main thread stops all threads.");
_stop.Set();
// wait a while to exit
Thread.Sleep(5000);
Console.WriteLine("Main thread exits.");
Console.WriteLine(string.Format("Last value of Semaphore was {0}.", _workItems.Release()));
Assert.True(_queue.Count == 0);
Console.WriteLine("Press Enter to exit.");
Console.ReadLine();
}
private static void Worker(object num)
{
// Each worker thread begins by requesting the semaphore.
Console.WriteLine("Thread {0} begins and waits for the semaphore.", num);
WaitHandle[] wait = { _workItems, _stop };
int signal;
while (0 == (signal = WaitHandle.WaitAny(wait)))
{
Console.WriteLine("Thread {0} becomes unblocked by Release() and has work to do.", num);
int res;
if (_queue.TryDequeue(out res))
{
Console.WriteLine("Thread {0} dequeues {1}.", num, res);
}
else
{
throw new Exception("this should not happen.");
}
}
if (signal == 1)
Console.WriteLine("Thread {0} was stopped.", num);
Console.WriteLine("Thread {0} exits.", num);
}
}
现在对于我的问题,我使用 WaitHandle.WaitAny(semaphore)
的前提是当我在信号量上调用 Release()
时,只有 1 个 Worker 会被唤醒。但是,我无法在文档中找到这确实是真的保证运行ce。任何人都可以确认这是真的吗?
确实有趣的是,文档似乎没有明确说明在 WaitOne
的情况下只有 1 个线程会收到信号。当您熟悉多线程理论时,这就变得不言自明了。
是的,在 Semaphore
上调用的 WaitOne
(以及在包含 Semaphore
的 WaitHandle
列表上调用的 WaitAny
)是由单个线程接收。如果您想从 MSDN 获取参考,那么这里是 Semaphore
是 WaitHandle
、which is:
的子 class
Encapsulates operating system–specific objects that wait for exclusive access to shared resources.
所以是的,除非明确说明的方法提供独占访问。
例如 ManualResetEvent
的方法 WaitOne
将为所有等待线程解锁,but documentation is explicit about it:
Notifies one or more waiting threads that an event has occurred.
受到 The Little Book of Semaphores 的启发,我决定使用 Semaphores 来实现生产者-消费者问题。
我特别希望能够随意停止所有的Worker线程。 我已经广泛测试了我的方法论,没有发现任何错误。
以下代码是用于测试的原型,可以运行作为控制台应用程序:
using System;
using System.Collections.Concurrent;
using System.Threading;
using NUnit.Framework;
public class ProducerConsumer
{
private static readonly int _numThreads = 5;
private static readonly int _numItemsEnqueued = 10;
private static readonly Semaphore _workItems = new Semaphore(0, int.MaxValue);
private static readonly ManualResetEvent _stop = new ManualResetEvent(false);
private static ConcurrentQueue<int> _queue;
public static void Main()
{
_queue = new ConcurrentQueue<int>();
// Create and start threads.
for (int i = 1; i <= _numThreads; i++)
{
Thread t = new Thread(new ParameterizedThreadStart(Worker));
// Start the thread, passing the number.
t.Start(i);
}
// Wait for half a second, to allow all the
// threads to start and to block on the semaphore.
Thread.Sleep(500);
Console.WriteLine(string.Format("Main thread adds {0} items to the queue and calls Release() {0} times.", _numItemsEnqueued));
for (int i = 1; i <= _numItemsEnqueued; i++)
{
Console.WriteLine("Waking up a worker thread.");
_queue.Enqueue(i);
_workItems.Release(); //wake up 1 worker
Thread.Sleep(2000); //sleep 2 sec so it's clear the threads get unblocked 1 by 1
}
// sleep for 5 seconds to allow threads to exit
Thread.Sleep(5000);
Assert.True(_queue.Count == 0);
Console.WriteLine("Main thread stops all threads.");
_stop.Set();
// wait a while to exit
Thread.Sleep(5000);
Console.WriteLine("Main thread exits.");
Console.WriteLine(string.Format("Last value of Semaphore was {0}.", _workItems.Release()));
Assert.True(_queue.Count == 0);
Console.WriteLine("Press Enter to exit.");
Console.ReadLine();
}
private static void Worker(object num)
{
// Each worker thread begins by requesting the semaphore.
Console.WriteLine("Thread {0} begins and waits for the semaphore.", num);
WaitHandle[] wait = { _workItems, _stop };
int signal;
while (0 == (signal = WaitHandle.WaitAny(wait)))
{
Console.WriteLine("Thread {0} becomes unblocked by Release() and has work to do.", num);
int res;
if (_queue.TryDequeue(out res))
{
Console.WriteLine("Thread {0} dequeues {1}.", num, res);
}
else
{
throw new Exception("this should not happen.");
}
}
if (signal == 1)
Console.WriteLine("Thread {0} was stopped.", num);
Console.WriteLine("Thread {0} exits.", num);
}
}
现在对于我的问题,我使用 WaitHandle.WaitAny(semaphore)
的前提是当我在信号量上调用 Release()
时,只有 1 个 Worker 会被唤醒。但是,我无法在文档中找到这确实是真的保证运行ce。任何人都可以确认这是真的吗?
确实有趣的是,文档似乎没有明确说明在 WaitOne
的情况下只有 1 个线程会收到信号。当您熟悉多线程理论时,这就变得不言自明了。
是的,在 Semaphore
上调用的 WaitOne
(以及在包含 Semaphore
的 WaitHandle
列表上调用的 WaitAny
)是由单个线程接收。如果您想从 MSDN 获取参考,那么这里是 Semaphore
是 WaitHandle
、which is:
Encapsulates operating system–specific objects that wait for exclusive access to shared resources.
所以是的,除非明确说明的方法提供独占访问。
例如 ManualResetEvent
的方法 WaitOne
将为所有等待线程解锁,but documentation is explicit about it:
Notifies one or more waiting threads that an event has occurred.