c# 多线程等待 ManualResetEvent

c# multiple threads waiting for a ManualResetEvent

我正在研究多线程并制作某种任务引擎。这个想法是引擎可以有一个可配置数量的线程等待,当一个新任务到达时,第一个空闲线程将它拾起并执行它。

问题是 2 个线程以某种方式拾取了相同的任务。我仔细查看了一下,我认为这段代码应该可以工作,但显然不行。如果我在现在被注释掉的地方添加 10ms 睡眠,它就可以工作,但我不确定我明白为什么。它看起来像 .Reset() 函数 returns 在它实际重置事件之前?

有人可以解释一下吗?当有多个等待时,有没有更好的方法只让一个线程继续?

谢谢

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace TaskTest
{
    public class Engine
    {
        private ManualResetEvent taskEvent;
        private ConcurrentQueue<Task> tasks;
        private bool running;
        private List<Thread> threads;
        private int threadAmount;
        private int threadsBusy = 0;

        public Engine(int amountOfThreads)
        {
            taskEvent = new ManualResetEvent(false);
            tasks = new ConcurrentQueue<Task>();
            threads = new List<Thread>();

            threadAmount = amountOfThreads;
        }

        public void Start()
        {
            running = true;
            for (var i = 0; i < threadAmount; i++)
            {
                var thread = new Thread(Process);
                thread.Name = "Thread " + i;
                threads.Add(thread);
                thread.Start();
            }
        }

        public void Stop()
        {
            running = false;
            taskEvent.Set();
            threads.ForEach(t => t.Join());
        }

        private void Process()
        {
            while (running)
            {
                lock (taskEvent)
                {
                    // Lock it so only a single thread is waiting on the event at the same time
                    taskEvent.WaitOne();
                    taskEvent.Reset();
                    //Thread.Sleep(10);
                }

                if (!running)
                {
                    taskEvent.Set();
                    return;
                }

                threadsBusy += 1;
                if (threadsBusy > 1)
                    Console.WriteLine("Failed");

                Task task;
                if (tasks.TryDequeue(out task))
                    task.Execute();

                threadsBusy -= 1;
            }
        }

        public void Enqueue(Task t)
        {
            tasks.Enqueue(t);
            taskEvent.Set();
        }
    }
}

编辑 其余代码:

namespace TaskTest
{
    public class Start
    {
        public static void Main(params string[] args)
        {
            var engine = new Engine(4);
            engine.Start();

            while (true)
            {
                Console.Read();
                engine.Enqueue(new Task());
            }
        }
    }
}


namespace TaskTest
{
    public class Task
    {
        public void Execute()
        {
            Console.WriteLine(Thread.CurrentThread.Name);
        }
    }
}

在按键上使用 Console.Read() 时,会从输入中读取两个字符。您应该改用 Console.ReadLine()

请注意,使用 BlockingCollection 处理同步可以大大简化您的代码:

public class Engine
{
    private BlockingCollection<Task> tasks;
    private List<Thread> threads;
    private int threadAmount;

    public Engine(int amountOfThreads)
    {
        tasks = new BlockingCollection<Task>();
        threads = new List<Thread>();

        threadAmount = amountOfThreads;
    }

    public void Start()
    {
        for (var i = 0; i < threadAmount; i++)
        {
            var thread = new Thread(Process);
            thread.Name = "Thread " + i;
            threads.Add(thread);
            thread.Start();
        }
    }

    public void Stop()
    {
        tasks.CompleteAdding();
        threads.ForEach(t => t.Join());
    }

    private void Process()
    {
        foreach (var task in tasks.GetConsumingEnumerable())
        {
            task.Execute();
        }
    }

    public void Enqueue(Task t)
    {
        tasks.Add(t);
    }
}