将异步方法传递给 Parallel.ForEach

Passing async method into Parallel.ForEach

我正在阅读 关于 Parallel.ForEach 的内容,其中指出 "Parallel.ForEach is not compatible with passing in a async method."

所以,为了检查我写了这段代码:

static async Task Main(string[] args)
{
    var results = new ConcurrentDictionary<string, int>();

    Parallel.ForEach(Enumerable.Range(0, 100), async index =>
    {
        var res = await DoAsyncJob(index);
        results.TryAdd(index.ToString(), res);
    });         

    Console.ReadLine();
}

static async Task<int> DoAsyncJob(int i)
{
    Thread.Sleep(100);
    return await Task.FromResult(i * 10);
}

这段代码同时填充results字典。

顺便说一下,我创建了一个类型为 ConcurrentDictionary<string, int> 的字典,因为万一我有 ConcurrentDictionary<int, int> 当我在调试模式下探索它的元素时,我看到元素是按键排序的,我想因此添加了元素。

那么,我想知道我的代码是否有效?如果它 "is not compatible with passing in a async method" 为什么它运作良好?

async 方法是一种启动并且 return 是 Task.

的方法

你的代码在这里

Parallel.ForEach(Enumerable.Range(0, 100), async index =>
{
    var res = await DoAsyncJob(index);
    results.TryAdd(index.ToString(), res);
});        

并行运行异步方法 100 次。也就是说,它并行化任务 creation,而不是整个任务。到 ForEach 完成 return 时,您的任务已 运行,但不一定完成。

您的代码有效,因为 DoAsyncJob() 实际上不是异步的 - 您的任务在 return 时完成。 Thread.Sleep() 是一种同步方法。 Task.Delay() 是它的异步等价物。

理解CPU-bound and I/O-bound operations的区别。正如其他人已经指出的那样,并行性(和 Parallel.ForEach)适用于 CPU 绑定操作,异步编程是不合适的。

此代码之所以有效,是因为 DoAsyncJob 并不是真正的异步方法。 async 不会使方法异步工作。等待 Task.FromResult 返回的已完成任务也是同步的。 async Task Main 不包含任何导致编译器警告的异步代码。

演示 Parallel.ForEach 如何不适用于异步方法的示例应调用真正的异步方法:

    static async Task Main(string[] args)
    {
        var results = new ConcurrentDictionary<string, int>();

        Parallel.ForEach(Enumerable.Range(0, 100), async index =>
        {
            var res = await DoAsyncJob(index);
            results.TryAdd(index.ToString(), res);
        });  
        Console.WriteLine($"Items in dictionary {results.Count}");
    }

    static async Task<int> DoAsyncJob(int i)
    {
        await Task.Delay(100);
        return i * 10;
    }

结果将是

Items in dictionary 0

Parallel.ForEach 没有接受 Func<Task> 的重载,它只接受 Action 代表。这意味着它不能等待任何异步操作。

async index 被接受是因为它隐含地是一个 async void delegate。就 Parallel.ForEach 而言,它只是一个 Action<int>.

结果是 Parallel.ForEach 触发了 100 个任务并且从不等待它们完成。这就是为什么当应用程序终止时字典仍然是空的。

如果您已经有异步工作,则不需要 Parallel.ForEach:

static async Task Main(string[] args)
{

    var results = await new Task.WhenAll(
        Enumerable.Range(0, 100)
        Select(i => DoAsyncJob(I)));

    Console.ReadLine();
}

关于你的异步作业,你要么一直异步:

static async Task<int> DoAsyncJob(int i)
{
    await Task.Delay(100);
    return await Task.FromResult(i * 10);
}

更好的是:

static async Task<int> DoAsyncJob(int i)
{
    await Task.Delay(100);
    return i * 10;
}

或根本没有:

static Task<int> DoAsyncJob(int i)
{
    Thread.Sleep(100);
    return Task.FromResult(i * 10);
}