在 C#8 IAsyncEnumerable<T> 中并行化 yield return
Parallelize yield return inside C#8 IAsyncEnumerable<T>
我有一个方法return是一个异步枚举器
public async IAsyncEnumerable<IResult> DoWorkAsync()
{
await Something();
foreach (var item in ListOfWorkItems)
{
yield return DoWork(item);
}
}
来电者:
public async Task LogResultsAsync()
{
await foreach (var result in DoWorkAsync())
{
Console.WriteLine(result);
}
}
因为 DoWork
是一个昂贵的操作,我更愿意以某种方式并行化它,所以它的工作方式类似于:
public async IAsyncEnumerable<IResult> DoWorkAsync()
{
await Something();
Parallel.ForEach(ListOfWorkItems, item =>
{
yield return DoWork(item);
});
}
但是我无法从内部执行 yield return Parallel.Foreach
所以想知道最好的方法是什么?
returned 结果的顺序无关紧要。
谢谢。
编辑: 抱歉,我在 DoWorkAsync
中遗漏了一些代码,它确实在等待一些东西,我只是没有把它放在上面的代码中,因为那不是与问题非常相关。现已更新
Edit2: DoWork
在我的例子中主要是 I/O 绑定,它正在从数据库中读取数据。
根据 canton7 的建议,您可以使用 AsParallel
而不是 Parallel.ForEach
。
这可以在标准 foreach
循环中使用,您可以在其中产生结果:
public async IAsyncEnumerable<IResult> DoWorkAsync()
{
await Something();
foreach (var result in ListOfWorkItems.AsParallel().Select(DoWork))
{
yield return result;
}
}
正如 Theodor Zoulias 所提到的,可枚举 returned 实际上根本不是异步的。
如果您只需要使用 await foreach
使用它,这应该不是问题,但更明确地说,您可以 return IEnumerable
并让调用者并行化它:
public async Task<IEnumerable<Item>> DoWorkAsync()
{
await Something();
return ListOfWorkItems;
}
// Caller...
Parallel.ForEach(await DoWorkAsync(), item =>
{
var result = DoWork(item);
//...
});
虽然如果需要在多个地方调用它可能不太容易维护
这是一个使用 TransformBlock
frοm the TPL Dataflow 库的基本实现:
public async IAsyncEnumerable<IResult> GetResults(List<IWorkItem> workItems)
{
// Define the dataflow block
var block = new TransformBlock<IWorkItem, IResult>(async item =>
{
return await TransformAsync(item);
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 10, // the default is 1
EnsureOrdered = false // the default is true
});
// Feed the block with input data
foreach (var item in workItems)
{
block.Post(item);
}
block.Complete();
// Stream the block's output as IAsyncEnumerable
while (await block.OutputAvailableAsync())
{
while (block.TryReceive(out var result))
{
yield return result;
}
}
// Propagate possible exceptions
await block.Completion;
}
此实现并不完美,因为如果 IAsyncEnumerable
的使用者过早地放弃枚举,TransformBlock
将继续在后台工作,直到处理完所有工作项。它也不支持取消,所有可敬的 IAsyncEnumerable
生产方法都应该支持。这些缺少的功能可以相对容易地添加。如果您有兴趣添加它们,请查看 this 问题。
我有一个方法return是一个异步枚举器
public async IAsyncEnumerable<IResult> DoWorkAsync()
{
await Something();
foreach (var item in ListOfWorkItems)
{
yield return DoWork(item);
}
}
来电者:
public async Task LogResultsAsync()
{
await foreach (var result in DoWorkAsync())
{
Console.WriteLine(result);
}
}
因为 DoWork
是一个昂贵的操作,我更愿意以某种方式并行化它,所以它的工作方式类似于:
public async IAsyncEnumerable<IResult> DoWorkAsync()
{
await Something();
Parallel.ForEach(ListOfWorkItems, item =>
{
yield return DoWork(item);
});
}
但是我无法从内部执行 yield return Parallel.Foreach
所以想知道最好的方法是什么?
returned 结果的顺序无关紧要。
谢谢。
编辑: 抱歉,我在 DoWorkAsync
中遗漏了一些代码,它确实在等待一些东西,我只是没有把它放在上面的代码中,因为那不是与问题非常相关。现已更新
Edit2: DoWork
在我的例子中主要是 I/O 绑定,它正在从数据库中读取数据。
根据 canton7 的建议,您可以使用 AsParallel
而不是 Parallel.ForEach
。
这可以在标准 foreach
循环中使用,您可以在其中产生结果:
public async IAsyncEnumerable<IResult> DoWorkAsync()
{
await Something();
foreach (var result in ListOfWorkItems.AsParallel().Select(DoWork))
{
yield return result;
}
}
正如 Theodor Zoulias 所提到的,可枚举 returned 实际上根本不是异步的。
如果您只需要使用 await foreach
使用它,这应该不是问题,但更明确地说,您可以 return IEnumerable
并让调用者并行化它:
public async Task<IEnumerable<Item>> DoWorkAsync()
{
await Something();
return ListOfWorkItems;
}
// Caller...
Parallel.ForEach(await DoWorkAsync(), item =>
{
var result = DoWork(item);
//...
});
虽然如果需要在多个地方调用它可能不太容易维护
这是一个使用 TransformBlock
frοm the TPL Dataflow 库的基本实现:
public async IAsyncEnumerable<IResult> GetResults(List<IWorkItem> workItems)
{
// Define the dataflow block
var block = new TransformBlock<IWorkItem, IResult>(async item =>
{
return await TransformAsync(item);
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 10, // the default is 1
EnsureOrdered = false // the default is true
});
// Feed the block with input data
foreach (var item in workItems)
{
block.Post(item);
}
block.Complete();
// Stream the block's output as IAsyncEnumerable
while (await block.OutputAvailableAsync())
{
while (block.TryReceive(out var result))
{
yield return result;
}
}
// Propagate possible exceptions
await block.Completion;
}
此实现并不完美,因为如果 IAsyncEnumerable
的使用者过早地放弃枚举,TransformBlock
将继续在后台工作,直到处理完所有工作项。它也不支持取消,所有可敬的 IAsyncEnumerable
生产方法都应该支持。这些缺少的功能可以相对容易地添加。如果您有兴趣添加它们,请查看 this 问题。