ManualResetEvent 不等待线程池完成

ManualResetEvent is not waiting for threadpool completion

我有要处理的批次列表。永远。
我想并行处理每个块 (5),完成后移动到下一个块。
出于某种原因,下面的代码不会等待块完成并继续,即使它没有完成。

while (true)
{
    foreach (string[] urlsArr in chunks)
    { 
        int i = 0;
        foreach (var url in urlsArr)
        {
            ThreadPool.QueueUserWorkItem(x =>
            {
                ProccessUrl(url, config, drivers[i]);
                _resetEvent.Set();
                i++;
            });
        }
        _resetEvent.WaitOne();// this is not really waiting.
    }
}

看看 Semaphore 或者它的瘦身版。信号量将允许您始终只有 5 个线程 运行。一旦这些 运行 个线程中的任何一个完成,它就可以开始新的工作。这样效率更高,尤其是在工作负载不均衡的情况下。考虑 1 个项目需要一个小时来处理而其他 4 个需要一秒钟的情况。在这种情况下,4 个线程将等待最后一个线程完成,然后再开始任何其他工作。

有关示例,请参阅 Need to understand the usage of SemaphoreSlim

在您的代码中,问题在于您只有一个等待句柄和 5 个线程。当 5 个 运行 线程中的任何一个完成工作时,它将设置等待句柄,从而允许您的外部循环继续进行,从而启动另外五个线程。到现在为止,内循环的前 4 个线程可能已经完成并且它们中的任何一个都可以再次设置等待句柄!现在,你看到这里有问题了吗?

根据 Hans 的评论,如果单个批次中的工作项之间存在依赖关系,那么在开始下一批次之前必须完成所有工作项,您应该查看 CountDownEvent

我认为您或许可以简化整个事情,并利用 Parallel.ForEach() 来管理线程并将并发度限制为 5。

如果您 运行 下面的示例代码,您会看到伪装的 URL 以 5 个为一组进行处理,因为并发线程的数量被限制为 5。

如果你这样做,你将不需要自己的分块逻辑:

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication2
{
    class Program
    {
        static void Main()
        {
            // Make some pretend URLs for this demo.

            string[] urls = Enumerable.Range(1, 100).Select(n => n.ToString()).ToArray();

            // Use Parallel.ForEach() along with MaxDegreeOfParallelism = 5 to process
            // these using 5 threads maximum:

            Parallel.ForEach(
                urls,
                new ParallelOptions{MaxDegreeOfParallelism = 5},
                processUrl
            );
        }

        static void processUrl(string url)
        {
            Console.WriteLine("Processing " + url);
            Thread.Sleep(1000);
            Console.WriteLine("Processed " + url);
        }
    }
}

这是一个带有任务的版本(async/await)

while (true)
        {
            foreach (string[] urlsArr in chunks)
            {
                Task[] tasks = new Task[urlsArr.Length];
                for (int i = 0; i < urlsArr.Length; i++)
                {
                    var url = urlsArr[i];
                    var driver = drivers[i];
                    tasks[i] = Task.Run(() => { ProccessUrl(url, config, driver); });
                }

                await Task.WhenAll(tasks);
            }
        }

请注意,它还修复了原始代码中的 'i' 变量未以线程安全方式递增的问题(可以通过 Interlocked.Increment 修复)。

如果您的代码不是 async 您可以等待线程中的任务完成(但这是阻塞的)

Task.WhenAll(tasks).Wait();