如何知道何时停止并行 foreach,其中消费者也是 C# 中的生产者
How to know when to stop a parallel foreach where the consumer is also the producer in C#
我正在尝试使用 Parallel.ForEach() 并行处理 BlockingCollection 中的一些项目。当处理一个项目时,它可以生成 0-2 个以上的项目来处理。要处理的项目数最终总是会达到 0。
我的问题是,由于消费者也是生产者(处理项目可以生成更多要处理的项目),当 BlockingCollection 为空时我无法调用 BlockingCollection 的 CompleteAdding(),因为当前可能有其他线程正在处理一个将生成更多项目的项目。因此我不知道如何让 BlockingCollection/Parallel.ForEach 知道它可以退出。
这是一个情况示例(为简单起见进行了修改)
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
namespace Example
{
class Example
{
static void Main(string[] args)
{
var process = new BlockingCollection<int>() { 30 };
var parallelOptions = new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount };
Parallel.ForEach(process.GetConsumingEnumerable(), parallelOptions, item =>
{
if (item > 20)
{
// Some add 2 items
process.Add(item - 1);
process.Add(item - 1);
Console.WriteLine($"process.Count: {process.Count} | item: {item} | Added: 2");
}
else if (item > 10)
{
// Some add 1 item
process.Add(item-1);
Console.WriteLine($"process.Count: {process.Count}| item: {item} | Added: 1");
}
else
{
// Some add 0 items
Console.WriteLine($"process.Count: {process.Count}| item: {item} | Added: 0");
}
});
// Parallel.ForEach never exits
Console.WriteLine("Completed Processing");
Console.ReadKey();
}
}
}
我尝试在 Parallel.ForEach 期间将 MaxDegreeOfParallelism 修改为要处理的项目数的最小值,并且 Environment.ProcessorCount 但在 Parallel.ForEach 期间没有任何作用。
我还尝试过存储未处理项目的数量,并在每个线程上更新该数量时执行锁定。当未处理的项目为 0 时,我将调用 AddingCompleted 方法。这也不行。
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
namespace Example
{
class Example
{
static void Main(string[] args)
{
var runningLock = new object();
int running = 0;
var process = new BlockingCollection<int>() { 30 };
var parallelOptions = new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount };
Parallel.ForEach(process.GetConsumingEnumerable(), parallelOptions, item =>
{
lock (runningLock)
{
running++;
}
if (item > 20)
{
// Some add 2 items
process.Add(item - 1);
process.Add(item - 1);
Console.WriteLine($"process.Count: {process.Count} | item: {item} | Added: 2 | running: {running}");
}
else if (item > 10)
{
// Some add 1 item
process.Add(item - 1);
Console.WriteLine($"process.Count: {process.Count}| item: {item} | Added: 1 | running: {running}");
}
else
{
// Some add 0 items
Console.WriteLine($"process.Count: {process.Count}| item: {item} | Added: 0 | running: {running}");
}
lock (runningLock)
{
running--;
if (running == 0 && process.Count == 0)
{
Console.WriteLine($"Stopping | running: {running} | process.Count: {process.Count}");
process.CompleteAdding();
}
}
});
// Parallel.ForEach never exits
Console.WriteLine("Completed Processing");
Console.ReadKey();
}
}
}
我应该用别的东西代替 Parallel.ForEach 吗?
此外,当将 MaxDegreeOfParallelism 设置为 1 时。如果 BlockingCollection 的初始项目 >= 27,它会正常处理所有内容,但是,如果 <= 26,它会停止处理大约 16 的项目?此外,较高的 MaxDegreeOfParallelism 会导致以较低的数量停止处理项目。
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
namespace Example
{
class Example
{
static void Main(string[] args)
{
// Normal
var process = new BlockingCollection<int>() { 27 };
// Stops around 16
//var process = new BlockingCollection<int>() { 26 };
var parallelOptions = new ParallelOptions() { MaxDegreeOfParallelism = 1 };
Parallel.ForEach(process.GetConsumingEnumerable(), parallelOptions, item =>
{
if (item > 20)
{
// Some add 2 items
process.Add(item - 1);
process.Add(item - 1);
Console.WriteLine($"Process Size: {process.Count} | Current Num: {item} | Added: 2");
}
else if (item > 10)
{
// Some add 1 item
process.Add(item - 1);
Console.WriteLine($"Process Size: {process.Count}| Current Num: {item} | Added: 1");
}
else
{
// Some add 0 items
Console.WriteLine($"Process Size: {process.Count}| Current Num: {item} | Added: 0");
}
});
// Parallel.ForEach never exits
Console.WriteLine("Completed Processing");
Console.ReadKey();
}
}
}
这里是 actual code 如果有人更喜欢查看实际代码而不是抽象版本。
你在这方面走在了正确的轨道上:
I've also tried storing a count of the number of unprocessed items and performing a lock when updating this number on each thread. When the unprocessed items is 0 then I call the AddingCompleted method.
问题是您实际上是在计算活跃工作人员的数量,而不是未处理项目的数量。 IE。当你开始处理某些东西时,你只会增加你的计数器,所以队列中可能有许多其他项目没有被那个计数器代表。要执行后者,您需要做的是每次向队列中添加内容时增加一个计数器,然后每次完成处理队列中的内容时减少一个计数器。
现在,如果您尝试过,您可能 运行 会遇到一个不同的问题:默认情况下,Parallel.ForEach()
方法会尝试从源中批量处理项目。这不适用于像 BlockingCollection<T>
这样的源,它可以在枚举期间阻塞,等待额外的数据。在您的示例中,这会导致死锁,其中 Parallel.ForEach()
正在等待更多项目,然后才将最近的批次排队,而 BlockingCollection<T>
正在等待更多项目被处理,从而导致更多项目排队。
如果 ForEach()
方法等待集合,而集合等待 ForEach()
方法,就会出现死锁。
虽然有一个解决方法:您可以提供 ForEach()
分区程序,该分区程序专门配置为不缓冲数据,而是在检索工作项时立即将其排队。
将这两种策略放在一起,您会得到一个看起来像这样的代码版本(我为诊断目的添加了一些小的输出更改):
static void Main(string[] args)
{
const int firstValue = 30;
const int secondValues = 20;
const int thirdValues = 10;
var process = new BlockingCollection<int>() { firstValue };
var parallelOptions = new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount };
int totalItemCount = process.Count;
OrderablePartitioner<int> partitioner = Partitioner.Create(process.GetConsumingEnumerable(), EnumerablePartitionerOptions.NoBuffering);
Parallel.ForEach(partitioner, parallelOptions, (item, state, i) =>
{
string message;
if (item > secondValues)
{
// Some add 2 items
Interlocked.Add(ref totalItemCount, 2);
process.Add(item - 1);
process.Add(item - 1);
message = $"{DateTime.Now.ToLongTimeString()}: process.Count: {process.Count} | item: {item} | Added: 2";
}
else if (item > thirdValues)
{
// Some add 1 item
Interlocked.Increment(ref totalItemCount);
process.Add(item - 1);
message = $"{DateTime.Now.ToLongTimeString()}: process.Count: {process.Count}| item: {item} | Added: 1";
}
else
{
// Some add 0 items
message = $"{DateTime.Now.ToLongTimeString()}: process.Count: {process.Count}| item: {item} | Added: 0";
}
int newCount = Interlocked.Decrement(ref totalItemCount);
if (newCount == 0)
{
process.CompleteAdding();
}
Console.WriteLine($"{message} | newCount: {newCount} | i: {i}");
});
// Parallel.ForEach will exit
Console.WriteLine("Completed Processing");
Console.ReadKey();
}
我正在尝试使用 Parallel.ForEach() 并行处理 BlockingCollection 中的一些项目。当处理一个项目时,它可以生成 0-2 个以上的项目来处理。要处理的项目数最终总是会达到 0。
我的问题是,由于消费者也是生产者(处理项目可以生成更多要处理的项目),当 BlockingCollection 为空时我无法调用 BlockingCollection 的 CompleteAdding(),因为当前可能有其他线程正在处理一个将生成更多项目的项目。因此我不知道如何让 BlockingCollection/Parallel.ForEach 知道它可以退出。
这是一个情况示例(为简单起见进行了修改)
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
namespace Example
{
class Example
{
static void Main(string[] args)
{
var process = new BlockingCollection<int>() { 30 };
var parallelOptions = new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount };
Parallel.ForEach(process.GetConsumingEnumerable(), parallelOptions, item =>
{
if (item > 20)
{
// Some add 2 items
process.Add(item - 1);
process.Add(item - 1);
Console.WriteLine($"process.Count: {process.Count} | item: {item} | Added: 2");
}
else if (item > 10)
{
// Some add 1 item
process.Add(item-1);
Console.WriteLine($"process.Count: {process.Count}| item: {item} | Added: 1");
}
else
{
// Some add 0 items
Console.WriteLine($"process.Count: {process.Count}| item: {item} | Added: 0");
}
});
// Parallel.ForEach never exits
Console.WriteLine("Completed Processing");
Console.ReadKey();
}
}
}
我尝试在 Parallel.ForEach 期间将 MaxDegreeOfParallelism 修改为要处理的项目数的最小值,并且 Environment.ProcessorCount 但在 Parallel.ForEach 期间没有任何作用。
我还尝试过存储未处理项目的数量,并在每个线程上更新该数量时执行锁定。当未处理的项目为 0 时,我将调用 AddingCompleted 方法。这也不行。
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
namespace Example
{
class Example
{
static void Main(string[] args)
{
var runningLock = new object();
int running = 0;
var process = new BlockingCollection<int>() { 30 };
var parallelOptions = new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount };
Parallel.ForEach(process.GetConsumingEnumerable(), parallelOptions, item =>
{
lock (runningLock)
{
running++;
}
if (item > 20)
{
// Some add 2 items
process.Add(item - 1);
process.Add(item - 1);
Console.WriteLine($"process.Count: {process.Count} | item: {item} | Added: 2 | running: {running}");
}
else if (item > 10)
{
// Some add 1 item
process.Add(item - 1);
Console.WriteLine($"process.Count: {process.Count}| item: {item} | Added: 1 | running: {running}");
}
else
{
// Some add 0 items
Console.WriteLine($"process.Count: {process.Count}| item: {item} | Added: 0 | running: {running}");
}
lock (runningLock)
{
running--;
if (running == 0 && process.Count == 0)
{
Console.WriteLine($"Stopping | running: {running} | process.Count: {process.Count}");
process.CompleteAdding();
}
}
});
// Parallel.ForEach never exits
Console.WriteLine("Completed Processing");
Console.ReadKey();
}
}
}
我应该用别的东西代替 Parallel.ForEach 吗?
此外,当将 MaxDegreeOfParallelism 设置为 1 时。如果 BlockingCollection 的初始项目 >= 27,它会正常处理所有内容,但是,如果 <= 26,它会停止处理大约 16 的项目?此外,较高的 MaxDegreeOfParallelism 会导致以较低的数量停止处理项目。
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
namespace Example
{
class Example
{
static void Main(string[] args)
{
// Normal
var process = new BlockingCollection<int>() { 27 };
// Stops around 16
//var process = new BlockingCollection<int>() { 26 };
var parallelOptions = new ParallelOptions() { MaxDegreeOfParallelism = 1 };
Parallel.ForEach(process.GetConsumingEnumerable(), parallelOptions, item =>
{
if (item > 20)
{
// Some add 2 items
process.Add(item - 1);
process.Add(item - 1);
Console.WriteLine($"Process Size: {process.Count} | Current Num: {item} | Added: 2");
}
else if (item > 10)
{
// Some add 1 item
process.Add(item - 1);
Console.WriteLine($"Process Size: {process.Count}| Current Num: {item} | Added: 1");
}
else
{
// Some add 0 items
Console.WriteLine($"Process Size: {process.Count}| Current Num: {item} | Added: 0");
}
});
// Parallel.ForEach never exits
Console.WriteLine("Completed Processing");
Console.ReadKey();
}
}
}
这里是 actual code 如果有人更喜欢查看实际代码而不是抽象版本。
你在这方面走在了正确的轨道上:
I've also tried storing a count of the number of unprocessed items and performing a lock when updating this number on each thread. When the unprocessed items is 0 then I call the AddingCompleted method.
问题是您实际上是在计算活跃工作人员的数量,而不是未处理项目的数量。 IE。当你开始处理某些东西时,你只会增加你的计数器,所以队列中可能有许多其他项目没有被那个计数器代表。要执行后者,您需要做的是每次向队列中添加内容时增加一个计数器,然后每次完成处理队列中的内容时减少一个计数器。
现在,如果您尝试过,您可能 运行 会遇到一个不同的问题:默认情况下,Parallel.ForEach()
方法会尝试从源中批量处理项目。这不适用于像 BlockingCollection<T>
这样的源,它可以在枚举期间阻塞,等待额外的数据。在您的示例中,这会导致死锁,其中 Parallel.ForEach()
正在等待更多项目,然后才将最近的批次排队,而 BlockingCollection<T>
正在等待更多项目被处理,从而导致更多项目排队。
如果 ForEach()
方法等待集合,而集合等待 ForEach()
方法,就会出现死锁。
虽然有一个解决方法:您可以提供 ForEach()
分区程序,该分区程序专门配置为不缓冲数据,而是在检索工作项时立即将其排队。
将这两种策略放在一起,您会得到一个看起来像这样的代码版本(我为诊断目的添加了一些小的输出更改):
static void Main(string[] args)
{
const int firstValue = 30;
const int secondValues = 20;
const int thirdValues = 10;
var process = new BlockingCollection<int>() { firstValue };
var parallelOptions = new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount };
int totalItemCount = process.Count;
OrderablePartitioner<int> partitioner = Partitioner.Create(process.GetConsumingEnumerable(), EnumerablePartitionerOptions.NoBuffering);
Parallel.ForEach(partitioner, parallelOptions, (item, state, i) =>
{
string message;
if (item > secondValues)
{
// Some add 2 items
Interlocked.Add(ref totalItemCount, 2);
process.Add(item - 1);
process.Add(item - 1);
message = $"{DateTime.Now.ToLongTimeString()}: process.Count: {process.Count} | item: {item} | Added: 2";
}
else if (item > thirdValues)
{
// Some add 1 item
Interlocked.Increment(ref totalItemCount);
process.Add(item - 1);
message = $"{DateTime.Now.ToLongTimeString()}: process.Count: {process.Count}| item: {item} | Added: 1";
}
else
{
// Some add 0 items
message = $"{DateTime.Now.ToLongTimeString()}: process.Count: {process.Count}| item: {item} | Added: 0";
}
int newCount = Interlocked.Decrement(ref totalItemCount);
if (newCount == 0)
{
process.CompleteAdding();
}
Console.WriteLine($"{message} | newCount: {newCount} | i: {i}");
});
// Parallel.ForEach will exit
Console.WriteLine("Completed Processing");
Console.ReadKey();
}