Parallel.ForEach 中的嵌套异步方法

Nested async methods in a Parallel.ForEach

我有一个方法,其中包含 运行 多个异步方法。我必须遍历设备列表,并将设备传递给此方法。我注意到这需要很长时间才能完成,所以我正在考虑使用 Parallel.ForEach 以便它可以 运行 同时针对多个设备执行此过程。

假设这是我的方法。

public async Task ProcessDevice(Device device) {
    var dev = await _deviceService.LookupDeviceIndbAsNoTracking(device);

    var result = await DoSomething(dev);
    await DoSomething2(dev);
}

然后 DoSomething2 也调用一个异步方法。

public async Task DoSomething2(Device dev) {
    foreach(var obj in dev.Objects) {
        await DoSomething3(obj);
    }
}

随着时间的推移,设备列表不断变大,因此该列表增长得越多,程序完成对每个设备的运行宁ProcessDevice()所需的时间就越长。我想一次处理多个设备。所以我一直在研究使用 Parallel.ForEach.

Parallel.ForEach(devices, async device => {
    try {
        await ProcessDevice(device);
    } catch (Exception ex) {
        throw ex;
    }
})

似乎程序在设备完全处理之前就完成了。我还尝试创建一个任务列表,然后为每个设备添加一个新任务 运行ning ProcessDevice 到该列表,然后等待 Task.WhenAll(listOfTasks);

var listOfTasks = new List<Task>();
foreach(var device in devices) {
    var task = Task.Run(async () => await ProcessDevice(device));
    listOfTasks.Add(task);
}
await Task.WhenAll(listOfTasks);

但任务似乎在 ProcessDevice() 实际完成之前标记为已完成 运行ning。

请原谅我对这个问题的无知,因为我是并行处理的新手,不确定发生了什么。是什么导致了这种行为?您是否可以提供任何文档来帮助我更好地理解该怎么做?

在你的最后一个例子中有几个问题:

var listOfTasks = new List<Task>();
foreach (var device in devices)
{
    await  Task.Run(async () => await ProcessDevice(device));
}
await Task.WhenAll(listOfTasks);

执行 await Task.Run(async () => await ProcessDevice(device)); 意味着在前一个循环完成之前,您不会移动到 foreach 循环的下一个迭代。本质上,您仍然一次只做一个。

此外,您没有向 listOfTasks 添加任何任务,因此它仍然是空的,因此 Task.WhenAll(listOfTasks) 立即完成,因为没有任务要等待。

试试这个:

var listOfTasks = new List<Task>();
foreach (var device in devices)
{
    var task = Task.Run(async () => await ProcessDevice(device))
    listOfTasks.Add(task);
}
await Task.WhenAll(listOfTasks);

不太确定这是不是你要的,但我可以举例说明我们如何启动异步进程

 private readonly Func<Worker> _worker;

    private void StartWorkers(IEnumerable<Props> props){
    Parallel.ForEach(props, timestamp => { _worker.Invoke().Consume(timestamp); });
    }

建议阅读有关 Parallel.ForEach 的内容,因为它会对您有所帮助。

您不能将 asyncParallel.ForEach 混用。由于您的底层操作是异步的,因此您希望使用异步并发,而不是并行。异步并发最容易表达为WhenAll:

var listOfTasks = devices.Select(ProcessDevice).ToList();
await Task.WhenAll(listOfTasks);

我可以用Parallel.ForEach解释问题。需要理解的重要一点是,当 await 关键字作用于不完整的 Task 时,它 returns。如果方法签名允许(如果不是 void),它将 return 自己的不完整 Task。然后由调用者使用 Task 对象等待作业完成。

但是Parallel.ForEach is an Action<T>中的第二个参数是void方法,也就是说Task不能被return调用,也就是说调用者(Parallel.ForEach 在这种情况下)无法等到作​​业完成。

因此,在您的情况下,一旦它达到 await ProcessDevice(device),它就会 return 并且没有任何等待它完成的事情,因此它会开始下一次迭代。当 Parallel.ForEach 完成时,它所做的只是 启动了 所有任务,而不是等待它们。

所以不要将 Parallel.ForEach 与异步代码一起使用。

斯蒂芬的回答更合适。您也可以使用 WSC 的答案,但这对于较大的列表来说可能很危险。一次创建数百或数千个新线程对您的性能没有帮助。