我可以更新 Parallel.For 正在使用的集合吗?
Can I update the collection that Parallel.For is using?
我有这样一种情况,我正在 运行执行一些任务,每个任务需要几秒到几分钟。我也有可能添加更多需要添加到已经 运行ning 并行循环的数据。是否可以更新 Parallel.For 正在使用的当前集合并让它继续迭代直到没有更多的对象可以检索?
这是显示我的问题的一些示例代码:
[Test]
public void DoesParallelForGetNewEntriesInLoop()
{
ConcurrentDictionary<int, string> dict = new ConcurrentDictionary<int, string>();
ConcurrentBag<string> bag = new ConcurrentBag<string>();
int i = 0;
// write to dictionary every 10ms simulating new additions
Timer t = new Timer(callback =>
{
dict.TryAdd(i++, "Value" + i);
}, dict, 0, 10);
// Add initial values
dict.TryAdd(i++, "Value" + i);
dict.TryAdd(i++, "Value" + i);
dict.TryAdd(i++, "Value" + i);
Parallel.For(0, dict.Count, (a, state) =>
{
string val = string.Empty;
if (dict.TryGetValue(a, out val))
{
bag.Add(val + Environment.NewLine);
}
if (i++ == 50)
state.Stop();
Thread.Sleep(5000);
});
foreach (var item in bag)
{
File.AppendAllText("parallelWrite.txt", item);
}
}
当我运行这个时,我得到的结果很简单:
Value2
Value1
Value3
Value4
是否有更好的方法来完成我在这里尝试做的事情?
Parallel.For
中的 from 和 to 参数只在循环开始前计算一次。
使用 Parallel.ForEach
迭代新项目。
我不确定您要实现什么目标,但更好的方法可能是将新数据放入 stack/queue 并定期弹出数据并进行处理。
在 Parallel.ForEach
中使用 BlockingCollection
并调用 GetConsumingEnumerable()
怎么样?
BlockingCollection<string> collection = new BlockingCollection<string>();
Parallel.ForEach(collection.GetConsumingEnumerable(), (x) => Console.WriteLine(x));
您可以使用 BlockingCollection 的 Add()
方法向集合中添加内容。
技术上 "double locking" 正在进行,因为 Parallel.ForEach 在从可枚举项中获取大块项目进行处理时锁定集合,并且构建 BlockingCollection 以支持多个消费者,因此它也实现了锁定。如果这成为性能问题(很可能),那么您可以为 BlockingCollection 实现自己的分区程序,因为 Parallel.ForEach 具有接受 OrderablePartitioner 和 Partitioner 的重载。这里有一篇很好的文章描述了如何:http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx
我有这样一种情况,我正在 运行执行一些任务,每个任务需要几秒到几分钟。我也有可能添加更多需要添加到已经 运行ning 并行循环的数据。是否可以更新 Parallel.For 正在使用的当前集合并让它继续迭代直到没有更多的对象可以检索? 这是显示我的问题的一些示例代码:
[Test]
public void DoesParallelForGetNewEntriesInLoop()
{
ConcurrentDictionary<int, string> dict = new ConcurrentDictionary<int, string>();
ConcurrentBag<string> bag = new ConcurrentBag<string>();
int i = 0;
// write to dictionary every 10ms simulating new additions
Timer t = new Timer(callback =>
{
dict.TryAdd(i++, "Value" + i);
}, dict, 0, 10);
// Add initial values
dict.TryAdd(i++, "Value" + i);
dict.TryAdd(i++, "Value" + i);
dict.TryAdd(i++, "Value" + i);
Parallel.For(0, dict.Count, (a, state) =>
{
string val = string.Empty;
if (dict.TryGetValue(a, out val))
{
bag.Add(val + Environment.NewLine);
}
if (i++ == 50)
state.Stop();
Thread.Sleep(5000);
});
foreach (var item in bag)
{
File.AppendAllText("parallelWrite.txt", item);
}
}
当我运行这个时,我得到的结果很简单:
Value2
Value1
Value3
Value4
是否有更好的方法来完成我在这里尝试做的事情?
Parallel.For
中的 from 和 to 参数只在循环开始前计算一次。
使用 Parallel.ForEach
迭代新项目。
我不确定您要实现什么目标,但更好的方法可能是将新数据放入 stack/queue 并定期弹出数据并进行处理。
在 Parallel.ForEach
BlockingCollection
并调用 GetConsumingEnumerable()
怎么样?
BlockingCollection<string> collection = new BlockingCollection<string>();
Parallel.ForEach(collection.GetConsumingEnumerable(), (x) => Console.WriteLine(x));
您可以使用 BlockingCollection 的 Add()
方法向集合中添加内容。
技术上 "double locking" 正在进行,因为 Parallel.ForEach 在从可枚举项中获取大块项目进行处理时锁定集合,并且构建 BlockingCollection 以支持多个消费者,因此它也实现了锁定。如果这成为性能问题(很可能),那么您可以为 BlockingCollection 实现自己的分区程序,因为 Parallel.ForEach 具有接受 OrderablePartitioner 和 Partitioner 的重载。这里有一篇很好的文章描述了如何:http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx