收集结果的 C# ActionBlock 等价物

C# ActionBlock Equivalent that collects results

我目前正在使用 ActionBlock 来处理串行启动的异步作业。它可以很好地处理每个发布到它的项目,但无法从每个作业中收集结果列表。

我可以使用什么以线程安全的方式收集我的作业结果?

我的代码目前是这样的:

var actionBlock = new ActionBlock<int> (async i => await Process(i));
for(int i = 0; i < 100; i++)
{
    actionBlock.Post(i);
}
actionBlock.Complete();
await actionBlock.Completion;

我试过改用 TransformBlock,但它在等待完成时无限期挂起。完成状态为 "WaitingForActivation".

我的 TransformBlock 代码是这样的:

var transformBlock = new TransformBlock<int, string> (async i => await Process(i));
for(int i = 0; i < 100; i++)
{
    actionBlock.Post(i);
}
actionBlock.Complete();
await actionBlock.Completion;
transformBlock.TryReceiveAll(out IList<string> strings);

原来 ConcurrentBag 就是答案

var bag = new ConcurrentBag<string>();
var actionBlock = new ActionBlock<int> (async i => 
   bag.Add(await Process(i))
);
for(int i = 0; i < 100; i++)
{
    actionBlock.Post(i);
}
actionBlock.Complete();
await actionBlock.Completion;

现在 'bag' 中包含所有结果,并且可以作为 IEnumerable 访问。

我实际最终使用的代码使用 Parallel.ForEach 而不是 ActionBlock。

Parallel.ForEach
(
    inputData, 
    i => bag.Add(await Process(i))
);

这要简单得多,但在性能方面似乎同样好,并且仍然有限制并行度等的选项。