Parallel.Foreach() 没有结果
Parallel.Foreach() yields no result
我正在尝试使用 Parallel.Foreach()
并行查询 mongo-db,但我没有得到任何结果。但是当我尝试 运行 在常规 foreach 循环中做同样的事情时,我能够执行预期的任务。
var exceptions = new ConcurrentQueue<Exception>();
var secondaryObjectsDictionaryCollection = new Dictionary<string, List<JObject>>();
// This works
foreach(var info in infos)
{
try
{
name = await commonValidator.ValidateAsync(name);
await commonValidator.ValidateIdAsync(name, id);
var list = await helper.ListRelatedObjectsAsync(name, id, info, false);
secondaryObjectsDictionaryCollection.Add(info.PrimaryId, secondaryObjectsList.ToList());
}
catch (Exception ex)
{
exceptions.Enqueue(ex);
}
}
//This does not
Parallel.ForEach(infos, async info =>
{
try
{
name = await commonValidator.ValidateAsync(name);
await commonValidator.ValidateIdAsync(name, id);
var list = await helper.ListRelatedObjectsAsync(name, id, info, false);
secondaryObjectsDictionaryCollection.Add(info.PrimaryId, secondaryObjectsList.ToList());
}
catch (Exception ex)
{
exceptions.Enqueue(ex);
}
});
我只想并行执行此任务,因为涉及不同的 mongodb 集合,并且还可以减少响应时间。
我无法弄清楚我的并行循环中出了什么问题。
并行执行这些任务的任何其他方法也将起作用。
Parallel.ForEach 与传入 async
方法不兼容。如果你想要类似于 Parallel.ForEach 的东西,你可以使用 Dataflow,它是 ActionBlock。
var workerBlock = new ActionBlock<Info>(async info =>
{
try
{
name = await commonValidator.ValidateAsync(name);
await commonValidator.ValidateIdAsync(name, id);
var list = await helper.ListRelatedObjectsAsync(name, id, info, false);
//Note this is not thread safe and you need to put a lock around it.
lock (secondaryObjectsDictionaryCollection)
{
secondaryObjectsDictionaryCollection.Add(info.PrimaryId, secondaryObjectsList.ToList());
}
}
catch (Exception ex)
{
exceptions.Enqueue(ex);
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
foreach(var info in infos)
{
workerBlock.Post(info);
}
workerBlock.Complete();
让我们看一个更简单的例子来说明同样的问题
您有类似的代码
var results = new Dictionary<int, int>();
Parallel.ForEach(Enumerable.Range(0, 5), async index =>
{
var result = await DoAsyncJob(index);
results.TryAdd(index, result);
});
您的代码没有 运行 因为表达式
async index => {...}
returns 未等待的任务
像这样
Parallel.ForEach(Enumerable.Range(0, 5), index => new Task());
顺便说一下,当你像你的例子一样使用多线程时,你应该使用 ConcurrentDictionary 而不是字典,当你进行并行更新以避免错误和死锁
这里的最佳解决方案是不要使用并行循环,而是使用 Task.WhenAll
var tasks = Enumerable.Range(0, 5).Select(async index =>
{
var result = await DoAsyncJob(index);
results.TryAdd(index, result);
});
await Task.WhenAll(tasks);
我正在尝试使用 Parallel.Foreach()
并行查询 mongo-db,但我没有得到任何结果。但是当我尝试 运行 在常规 foreach 循环中做同样的事情时,我能够执行预期的任务。
var exceptions = new ConcurrentQueue<Exception>();
var secondaryObjectsDictionaryCollection = new Dictionary<string, List<JObject>>();
// This works
foreach(var info in infos)
{
try
{
name = await commonValidator.ValidateAsync(name);
await commonValidator.ValidateIdAsync(name, id);
var list = await helper.ListRelatedObjectsAsync(name, id, info, false);
secondaryObjectsDictionaryCollection.Add(info.PrimaryId, secondaryObjectsList.ToList());
}
catch (Exception ex)
{
exceptions.Enqueue(ex);
}
}
//This does not
Parallel.ForEach(infos, async info =>
{
try
{
name = await commonValidator.ValidateAsync(name);
await commonValidator.ValidateIdAsync(name, id);
var list = await helper.ListRelatedObjectsAsync(name, id, info, false);
secondaryObjectsDictionaryCollection.Add(info.PrimaryId, secondaryObjectsList.ToList());
}
catch (Exception ex)
{
exceptions.Enqueue(ex);
}
});
我只想并行执行此任务,因为涉及不同的 mongodb 集合,并且还可以减少响应时间。
我无法弄清楚我的并行循环中出了什么问题。 并行执行这些任务的任何其他方法也将起作用。
Parallel.ForEach 与传入 async
方法不兼容。如果你想要类似于 Parallel.ForEach 的东西,你可以使用 Dataflow,它是 ActionBlock。
var workerBlock = new ActionBlock<Info>(async info =>
{
try
{
name = await commonValidator.ValidateAsync(name);
await commonValidator.ValidateIdAsync(name, id);
var list = await helper.ListRelatedObjectsAsync(name, id, info, false);
//Note this is not thread safe and you need to put a lock around it.
lock (secondaryObjectsDictionaryCollection)
{
secondaryObjectsDictionaryCollection.Add(info.PrimaryId, secondaryObjectsList.ToList());
}
}
catch (Exception ex)
{
exceptions.Enqueue(ex);
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
foreach(var info in infos)
{
workerBlock.Post(info);
}
workerBlock.Complete();
让我们看一个更简单的例子来说明同样的问题
您有类似的代码
var results = new Dictionary<int, int>();
Parallel.ForEach(Enumerable.Range(0, 5), async index =>
{
var result = await DoAsyncJob(index);
results.TryAdd(index, result);
});
您的代码没有 运行 因为表达式
async index => {...}
returns 未等待的任务
像这样
Parallel.ForEach(Enumerable.Range(0, 5), index => new Task());
顺便说一下,当你像你的例子一样使用多线程时,你应该使用 ConcurrentDictionary 而不是字典,当你进行并行更新以避免错误和死锁
这里的最佳解决方案是不要使用并行循环,而是使用 Task.WhenAll
var tasks = Enumerable.Range(0, 5).Select(async index =>
{
var result = await DoAsyncJob(index);
results.TryAdd(index, result);
});
await Task.WhenAll(tasks);