使用信号量的 C# 生产者-消费者

C# Producer-Consumer using Semaphores

受到 The Little Book of Semaphores 的启发,我决定使用 Semaphores 来实现生产者-消费者问题。

我特别希望能够随意停止所有的Worker线程。 我已经广泛测试了我的方法论,没有发现任何错误。

以下代码是用于测试的原型,可以运行作为控制台应用程序:

using System;
using System.Collections.Concurrent;
using System.Threading;
using NUnit.Framework;

public class ProducerConsumer
{
    private static readonly int _numThreads = 5;
    private static readonly int _numItemsEnqueued = 10;
    private static readonly Semaphore _workItems = new Semaphore(0, int.MaxValue);
    private static readonly ManualResetEvent _stop = new ManualResetEvent(false);
    private static ConcurrentQueue<int> _queue;

    public static void Main()
    {
        _queue = new ConcurrentQueue<int>();

        // Create and start threads.
        for (int i = 1; i <= _numThreads; i++)
        {
            Thread t = new Thread(new ParameterizedThreadStart(Worker));

            // Start the thread, passing the number.
            t.Start(i);
        }

        // Wait for half a second, to allow all the
        // threads to start and to block on the semaphore.
        Thread.Sleep(500);

        Console.WriteLine(string.Format("Main thread adds {0} items to the queue and calls Release() {0} times.", _numItemsEnqueued));
        for (int i = 1; i <= _numItemsEnqueued; i++)
        {
            Console.WriteLine("Waking up a worker thread.");
            _queue.Enqueue(i);
            _workItems.Release(); //wake up 1 worker
            Thread.Sleep(2000); //sleep 2 sec so it's clear the threads get unblocked 1 by 1
        }

        // sleep for 5 seconds to allow threads to exit
        Thread.Sleep(5000);
        Assert.True(_queue.Count == 0);

        Console.WriteLine("Main thread stops all threads.");
        _stop.Set();

        // wait a while to exit
        Thread.Sleep(5000);
        Console.WriteLine("Main thread exits.");
        Console.WriteLine(string.Format("Last value of Semaphore was {0}.", _workItems.Release()));
        Assert.True(_queue.Count == 0);
        Console.WriteLine("Press Enter to exit.");
        Console.ReadLine();
    }

    private static void Worker(object num)
    {
        // Each worker thread begins by requesting the semaphore.
        Console.WriteLine("Thread {0} begins and waits for the semaphore.", num);
        WaitHandle[] wait = { _workItems, _stop };
        int signal;
        while (0 == (signal = WaitHandle.WaitAny(wait)))
        {
            Console.WriteLine("Thread {0} becomes unblocked by Release() and has work to do.", num);
            int res;
            if (_queue.TryDequeue(out res))
            {
                Console.WriteLine("Thread {0} dequeues {1}.", num, res);
            }
            else
            {
                throw new Exception("this should not happen.");
            }
        }

        if (signal == 1)
            Console.WriteLine("Thread {0} was stopped.", num);

        Console.WriteLine("Thread {0} exits.", num);
    }
}

现在对于我的问题,我使用 WaitHandle.WaitAny(semaphore) 的前提是当我在信号量上调用 Release() 时,只有 1 个 Worker 会被唤醒。但是,我无法在文档中找到这确实是真的保证运行ce。任何人都可以确认这是真的吗?

确实有趣的是,文档似乎没有明确说明在 WaitOne 的情况下只有 1 个线程会收到信号。当您熟悉多线程理论时,这就变得不言自明了。

是的,在 Semaphore 上调用的 WaitOne(以及在包含 SemaphoreWaitHandle 列表上调用的 WaitAny)是由单个线程接收。如果您想从 MSDN 获取参考,那么这里是 SemaphoreWaitHandlewhich is:

的子 class

Encapsulates operating system–specific objects that wait for exclusive access to shared resources.

所以是的,除非明确说明的方法提供独占访问。

例如 ManualResetEvent 的方法 WaitOne 将为所有等待线程解锁,but documentation is explicit about it:

Notifies one or more waiting threads that an event has occurred.