Parallel.ForEach 中的嵌套异步方法
Nested async methods in a Parallel.ForEach
我有一个方法,其中包含 运行 多个异步方法。我必须遍历设备列表,并将设备传递给此方法。我注意到这需要很长时间才能完成,所以我正在考虑使用 Parallel.ForEach
以便它可以 运行 同时针对多个设备执行此过程。
假设这是我的方法。
public async Task ProcessDevice(Device device) {
var dev = await _deviceService.LookupDeviceIndbAsNoTracking(device);
var result = await DoSomething(dev);
await DoSomething2(dev);
}
然后 DoSomething2 也调用一个异步方法。
public async Task DoSomething2(Device dev) {
foreach(var obj in dev.Objects) {
await DoSomething3(obj);
}
}
随着时间的推移,设备列表不断变大,因此该列表增长得越多,程序完成对每个设备的运行宁ProcessDevice()
所需的时间就越长。我想一次处理多个设备。所以我一直在研究使用 Parallel.ForEach
.
Parallel.ForEach(devices, async device => {
try {
await ProcessDevice(device);
} catch (Exception ex) {
throw ex;
}
})
似乎程序在设备完全处理之前就完成了。我还尝试创建一个任务列表,然后为每个设备添加一个新任务 运行ning ProcessDevice 到该列表,然后等待 Task.WhenAll(listOfTasks);
var listOfTasks = new List<Task>();
foreach(var device in devices) {
var task = Task.Run(async () => await ProcessDevice(device));
listOfTasks.Add(task);
}
await Task.WhenAll(listOfTasks);
但任务似乎在 ProcessDevice()
实际完成之前标记为已完成 运行ning。
请原谅我对这个问题的无知,因为我是并行处理的新手,不确定发生了什么。是什么导致了这种行为?您是否可以提供任何文档来帮助我更好地理解该怎么做?
在你的最后一个例子中有几个问题:
var listOfTasks = new List<Task>();
foreach (var device in devices)
{
await Task.Run(async () => await ProcessDevice(device));
}
await Task.WhenAll(listOfTasks);
执行 await Task.Run(async () => await ProcessDevice(device));
意味着在前一个循环完成之前,您不会移动到 foreach
循环的下一个迭代。本质上,您仍然一次只做一个。
此外,您没有向 listOfTasks
添加任何任务,因此它仍然是空的,因此 Task.WhenAll(listOfTasks)
立即完成,因为没有任务要等待。
试试这个:
var listOfTasks = new List<Task>();
foreach (var device in devices)
{
var task = Task.Run(async () => await ProcessDevice(device))
listOfTasks.Add(task);
}
await Task.WhenAll(listOfTasks);
不太确定这是不是你要的,但我可以举例说明我们如何启动异步进程
private readonly Func<Worker> _worker;
private void StartWorkers(IEnumerable<Props> props){
Parallel.ForEach(props, timestamp => { _worker.Invoke().Consume(timestamp); });
}
建议阅读有关 Parallel.ForEach 的内容,因为它会对您有所帮助。
您不能将 async
与 Parallel.ForEach
混用。由于您的底层操作是异步的,因此您希望使用异步并发,而不是并行。异步并发最容易表达为WhenAll
:
var listOfTasks = devices.Select(ProcessDevice).ToList();
await Task.WhenAll(listOfTasks);
我可以用Parallel.ForEach
解释问题。需要理解的重要一点是,当 await
关键字作用于不完整的 Task
时,它 returns。如果方法签名允许(如果不是 void
),它将 return 自己的不完整 Task
。然后由调用者使用 Task
对象等待作业完成。
但是Parallel.ForEach
is an Action<T>
中的第二个参数是void
方法,也就是说Task
不能被return调用,也就是说调用者(Parallel.ForEach
在这种情况下)无法等到作业完成。
因此,在您的情况下,一旦它达到 await ProcessDevice(device)
,它就会 return 并且没有任何等待它完成的事情,因此它会开始下一次迭代。当 Parallel.ForEach
完成时,它所做的只是 启动了 所有任务,而不是等待它们。
所以不要将 Parallel.ForEach
与异步代码一起使用。
斯蒂芬的回答更合适。您也可以使用 WSC 的答案,但这对于较大的列表来说可能很危险。一次创建数百或数千个新线程对您的性能没有帮助。
我有一个方法,其中包含 运行 多个异步方法。我必须遍历设备列表,并将设备传递给此方法。我注意到这需要很长时间才能完成,所以我正在考虑使用 Parallel.ForEach
以便它可以 运行 同时针对多个设备执行此过程。
假设这是我的方法。
public async Task ProcessDevice(Device device) {
var dev = await _deviceService.LookupDeviceIndbAsNoTracking(device);
var result = await DoSomething(dev);
await DoSomething2(dev);
}
然后 DoSomething2 也调用一个异步方法。
public async Task DoSomething2(Device dev) {
foreach(var obj in dev.Objects) {
await DoSomething3(obj);
}
}
随着时间的推移,设备列表不断变大,因此该列表增长得越多,程序完成对每个设备的运行宁ProcessDevice()
所需的时间就越长。我想一次处理多个设备。所以我一直在研究使用 Parallel.ForEach
.
Parallel.ForEach(devices, async device => {
try {
await ProcessDevice(device);
} catch (Exception ex) {
throw ex;
}
})
似乎程序在设备完全处理之前就完成了。我还尝试创建一个任务列表,然后为每个设备添加一个新任务 运行ning ProcessDevice 到该列表,然后等待 Task.WhenAll(listOfTasks);
var listOfTasks = new List<Task>();
foreach(var device in devices) {
var task = Task.Run(async () => await ProcessDevice(device));
listOfTasks.Add(task);
}
await Task.WhenAll(listOfTasks);
但任务似乎在 ProcessDevice()
实际完成之前标记为已完成 运行ning。
请原谅我对这个问题的无知,因为我是并行处理的新手,不确定发生了什么。是什么导致了这种行为?您是否可以提供任何文档来帮助我更好地理解该怎么做?
在你的最后一个例子中有几个问题:
var listOfTasks = new List<Task>();
foreach (var device in devices)
{
await Task.Run(async () => await ProcessDevice(device));
}
await Task.WhenAll(listOfTasks);
执行 await Task.Run(async () => await ProcessDevice(device));
意味着在前一个循环完成之前,您不会移动到 foreach
循环的下一个迭代。本质上,您仍然一次只做一个。
此外,您没有向 listOfTasks
添加任何任务,因此它仍然是空的,因此 Task.WhenAll(listOfTasks)
立即完成,因为没有任务要等待。
试试这个:
var listOfTasks = new List<Task>();
foreach (var device in devices)
{
var task = Task.Run(async () => await ProcessDevice(device))
listOfTasks.Add(task);
}
await Task.WhenAll(listOfTasks);
不太确定这是不是你要的,但我可以举例说明我们如何启动异步进程
private readonly Func<Worker> _worker;
private void StartWorkers(IEnumerable<Props> props){
Parallel.ForEach(props, timestamp => { _worker.Invoke().Consume(timestamp); });
}
建议阅读有关 Parallel.ForEach 的内容,因为它会对您有所帮助。
您不能将 async
与 Parallel.ForEach
混用。由于您的底层操作是异步的,因此您希望使用异步并发,而不是并行。异步并发最容易表达为WhenAll
:
var listOfTasks = devices.Select(ProcessDevice).ToList();
await Task.WhenAll(listOfTasks);
我可以用Parallel.ForEach
解释问题。需要理解的重要一点是,当 await
关键字作用于不完整的 Task
时,它 returns。如果方法签名允许(如果不是 void
),它将 return 自己的不完整 Task
。然后由调用者使用 Task
对象等待作业完成。
但是Parallel.ForEach
is an Action<T>
中的第二个参数是void
方法,也就是说Task
不能被return调用,也就是说调用者(Parallel.ForEach
在这种情况下)无法等到作业完成。
因此,在您的情况下,一旦它达到 await ProcessDevice(device)
,它就会 return 并且没有任何等待它完成的事情,因此它会开始下一次迭代。当 Parallel.ForEach
完成时,它所做的只是 启动了 所有任务,而不是等待它们。
所以不要将 Parallel.ForEach
与异步代码一起使用。
斯蒂芬的回答更合适。您也可以使用 WSC 的答案,但这对于较大的列表来说可能很危险。一次创建数百或数千个新线程对您的性能没有帮助。