在 C#8 IAsyncEnumerable<T> 中并行化 yield return

Parallelize yield return inside C#8 IAsyncEnumerable<T>

我有一个方法return是一个异步枚举器

    public async IAsyncEnumerable<IResult> DoWorkAsync()
    {
        await Something();
        foreach (var item in ListOfWorkItems)
        {
            yield return DoWork(item);
        }
    }

来电者:

    public async Task LogResultsAsync()
    {
        await foreach (var result in DoWorkAsync())
        {
            Console.WriteLine(result);
        }
    }

因为 DoWork 是一个昂贵的操作,我更愿意以某种方式并行化它,所以它的工作方式类似于:

    public async IAsyncEnumerable<IResult> DoWorkAsync()
    {
        await Something();
        Parallel.ForEach(ListOfWorkItems, item =>
        {
            yield return DoWork(item);
        });
    }

但是我无法从内部执行 yield return Parallel.Foreach 所以想知道最好的方法是什么?

returned 结果的顺序无关紧要。

谢谢。

编辑: 抱歉,我在 DoWorkAsync 中遗漏了一些代码,它确实在等待一些东西,我只是没有把它放在上面的代码中,因为那不是与问题非常相关。现已更新

Edit2: DoWork 在我的例子中主要是 I/O 绑定,它正在从数据库中读取数据。

根据 canton7 的建议,您可以使用 AsParallel 而不是 Parallel.ForEach

这可以在标准 foreach 循环中使用,您可以在其中产生结果:

public async IAsyncEnumerable<IResult> DoWorkAsync()
{
    await Something();
    foreach (var result in ListOfWorkItems.AsParallel().Select(DoWork))
    {
        yield return result;
    }
}

正如 Theodor Zoulias 所提到的,可枚举 returned 实际上根本不是异步的。

如果您只需要使用 await foreach 使用它,这应该不是问题,但更明确地说,您可以 return IEnumerable 并让调用者并行化它:

public async Task<IEnumerable<Item>> DoWorkAsync()
{
    await Something();
    return ListOfWorkItems;
}

// Caller...
Parallel.ForEach(await DoWorkAsync(), item => 
{
    var result = DoWork(item);
    //...
});

虽然如果需要在多个地方调用它可能不太容易维护

这是一个使用 TransformBlock frοm the TPL Dataflow 库的基本实现:

public async IAsyncEnumerable<IResult> GetResults(List<IWorkItem> workItems)
{
    // Define the dataflow block
    var block = new TransformBlock<IWorkItem, IResult>(async item =>
    {
        return await TransformAsync(item);
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 10, // the default is 1
        EnsureOrdered = false // the default is true
    });

    // Feed the block with input data
    foreach (var item in workItems)
    {
        block.Post(item);
    }
    block.Complete();

    // Stream the block's output as IAsyncEnumerable
    while (await block.OutputAvailableAsync())
    {
        while (block.TryReceive(out var result))
        {
            yield return result;
        }
    }

    // Propagate possible exceptions
    await block.Completion;
}

此实现并不完美,因为如果 IAsyncEnumerable 的使用者过早地放弃枚举,TransformBlock 将继续在后台工作,直到处理完所有工作项。它也不支持取消,所有可敬的 IAsyncEnumerable 生产方法都应该支持。这些缺少的功能可以相对容易地添加。如果您有兴趣添加它们,请查看 this 问题。