如何知道何时停止并行 foreach,其中消费者也是 C# 中的生产者

How to know when to stop a parallel foreach where the consumer is also the producer in C#

我正在尝试使用 Parallel.ForEach() 并行处理 BlockingCollection 中的一些项目。当处理一个项目时,它可以生成 0-2 个以上的项目来处理。要处理的项目数最终总是会达到 0。

我的问题是,由于消费者也是生产者(处理项目可以生成更多要处理的项目),当 BlockingCollection 为空时我无法调用 BlockingCollection 的 CompleteAdding(),因为当前可能有其他线程正在处理一个将生成更多项目的项目。因此我不知道如何让 BlockingCollection/Parallel.ForEach 知道它可以退出。

这是一个情况示例(为简单起见进行了修改)

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

namespace Example
{
    class Example
    {
        static void Main(string[] args)
        {
            var process = new BlockingCollection<int>() { 30 };

            var parallelOptions = new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount };

            Parallel.ForEach(process.GetConsumingEnumerable(), parallelOptions, item =>
            {
                if (item > 20)
                {
                    // Some add 2 items
                    process.Add(item - 1);
                    process.Add(item - 1);
                    Console.WriteLine($"process.Count: {process.Count} | item: {item} | Added: 2");
                }
                else if (item > 10)
                {
                    // Some add 1 item
                    process.Add(item-1);
                    Console.WriteLine($"process.Count: {process.Count}| item: {item} | Added: 1");
                }
                else
                {
                    // Some add 0 items
                    Console.WriteLine($"process.Count: {process.Count}| item: {item} | Added: 0");
                }
            });

            // Parallel.ForEach never exits
            Console.WriteLine("Completed Processing");

            Console.ReadKey();
        }
    }
}

我尝试在 Parallel.ForEach 期间将 MaxDegreeOfParallelism 修改为要处理的项目数的最小值,并且 Environment.ProcessorCount 但在 Parallel.ForEach 期间没有任何作用。

我还尝试过存储未处理项目的数量,并在每个线程上更新该数量时执行锁定。当未处理的项目为 0 时,我将调用 AddingCompleted 方法。这也不行。

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

namespace Example
{
    class Example
    {
        static void Main(string[] args)
        {
            var runningLock = new object();
            int running = 0;

            var process = new BlockingCollection<int>() { 30 };

            var parallelOptions = new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount };

            Parallel.ForEach(process.GetConsumingEnumerable(), parallelOptions, item =>
            {
                lock (runningLock)
                {
                    running++;
                }

                if (item > 20)
                {
                    // Some add 2 items
                    process.Add(item - 1);
                    process.Add(item - 1);
                    Console.WriteLine($"process.Count: {process.Count} | item: {item} | Added: 2 | running: {running}");
                }
                else if (item > 10)
                {
                    // Some add 1 item
                    process.Add(item - 1);
                    Console.WriteLine($"process.Count: {process.Count}| item: {item} | Added: 1 | running: {running}");
                }
                else
                {
                    // Some add 0 items
                    Console.WriteLine($"process.Count: {process.Count}| item: {item} | Added: 0 | running: {running}");
                }

                lock (runningLock)
                {
                    running--;

                    if (running == 0 && process.Count == 0)
                    {
                        Console.WriteLine($"Stopping | running: {running} | process.Count: {process.Count}");
                        process.CompleteAdding();
                    }
                }
            });

            // Parallel.ForEach never exits
            Console.WriteLine("Completed Processing");

            Console.ReadKey();
        }
    }
}

我应该用别的东西代替 Parallel.ForEach 吗?

此外,当将 MaxDegreeOfParallelism 设置为 1 时。如果 BlockingCollection 的初始项目 >= 27,它会正常处理所有内容,但是,如果 <= 26,它会停止处理大约 16 的项目?此外,较高的 MaxDegreeOfParallelism 会导致以较低的数量停止处理项目。

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

namespace Example
{
    class Example
    {
        static void Main(string[] args)
        {
            // Normal
            var process = new BlockingCollection<int>() { 27 };
            // Stops around 16
            //var process = new BlockingCollection<int>() { 26 };

            var parallelOptions = new ParallelOptions() { MaxDegreeOfParallelism = 1 };

            Parallel.ForEach(process.GetConsumingEnumerable(), parallelOptions, item =>
            {
                if (item > 20)
                {
                    // Some add 2 items
                    process.Add(item - 1);
                    process.Add(item - 1);
                    Console.WriteLine($"Process Size: {process.Count} | Current Num: {item} | Added: 2");
                }
                else if (item > 10)
                {
                    // Some add 1 item
                    process.Add(item - 1);
                    Console.WriteLine($"Process Size: {process.Count}| Current Num: {item} | Added: 1");
                }
                else
                {
                    // Some add 0 items
                    Console.WriteLine($"Process Size: {process.Count}| Current Num: {item} | Added: 0");
                }
            });

            // Parallel.ForEach never exits
            Console.WriteLine("Completed Processing");

            Console.ReadKey();
        }
    }
}

这里是 actual code 如果有人更喜欢查看实际代码而不是抽象版本。

你在这方面走在了正确的轨道上:

I've also tried storing a count of the number of unprocessed items and performing a lock when updating this number on each thread. When the unprocessed items is 0 then I call the AddingCompleted method.

问题是您实际上是在计算活跃工作人员的数量,而不是未处理项目的数量。 IE。当你开始处理某些东西时,你只会增加你的计数器,所以队列中可能有许多其他项目没有被那个计数器代表。要执行后者,您需要做的是每次向队列中添加内容时增加一个计数器,然后每次完成处理队列中的内容时减少一个计数器。

现在,如果您尝试过,您可能 运行 会遇到一个不同的问题:默认情况下,Parallel.ForEach() 方法会尝试从源中批量处理项目。这不适用于像 BlockingCollection<T> 这样的源,它可以在枚举期间阻塞,等待额外的数据。在您的示例中,这会导致死锁,其中 Parallel.ForEach() 正在等待更多项目,然后才将最近的批次排队,而 BlockingCollection<T> 正在等待更多项目被处理,从而导致更多项目排队。

如果 ForEach() 方法等待集合,而集合等待 ForEach() 方法,就会出现死锁。

虽然有一个解决方法:您可以提供 ForEach() 分区程序,该分区程序专门配置为不缓冲数据,而是在检索工作项时立即将其排队。

将这两种策略放在一起,您会得到一个看起来像这样的代码版本(我为诊断目的添加了一些小的输出更改):

static void Main(string[] args)
{
    const int firstValue = 30;
    const int secondValues = 20;
    const int thirdValues = 10;

    var process = new BlockingCollection<int>() { firstValue };

    var parallelOptions = new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount };
    int totalItemCount = process.Count;

    OrderablePartitioner<int> partitioner = Partitioner.Create(process.GetConsumingEnumerable(), EnumerablePartitionerOptions.NoBuffering);

    Parallel.ForEach(partitioner, parallelOptions, (item, state, i) =>
    {
        string message;

        if (item > secondValues)
        {
            // Some add 2 items
            Interlocked.Add(ref totalItemCount, 2);
            process.Add(item - 1);
            process.Add(item - 1);
            message = $"{DateTime.Now.ToLongTimeString()}: process.Count: {process.Count} | item: {item} | Added: 2";
        }
        else if (item > thirdValues)
        {
            // Some add 1 item
            Interlocked.Increment(ref totalItemCount);
            process.Add(item - 1);
            message = $"{DateTime.Now.ToLongTimeString()}: process.Count: {process.Count}| item: {item} | Added: 1";
        }
        else
        {
            // Some add 0 items
            message = $"{DateTime.Now.ToLongTimeString()}: process.Count: {process.Count}| item: {item} | Added: 0";
        }

        int newCount = Interlocked.Decrement(ref totalItemCount);

        if (newCount == 0)
        {
            process.CompleteAdding();
        }

        Console.WriteLine($"{message} | newCount: {newCount} | i: {i}");
    });

    // Parallel.ForEach will exit
    Console.WriteLine("Completed Processing");    
    Console.ReadKey();
}