如何在 TransformBlock 之后写入对象?

How to write to an object after TransformBlock?

我有一个需要并行迭代的对象列表。这是我需要做的:

foreach (var r in results)
{
    r.SomeList = await apiHelper.Get(r.Id);
}

因为我想将它并行化,所以我尝试使用 Parallel.ForEach() 但它不会等到一切都真正完成,因为 apiHelper.Get() 正在其内部进行等待。

Parallel.ForEach(
                results,
                async (r) =>
                {
                    r.SomeList = await apiHelper.Get(r.Id);
                });

所以我在网上搜索了一下,发现了这个: Nesting await in Parallel.ForEach

现在我是 TPL 的新手(20 分钟前),我可能遗漏了一些明显的东西。我该如何继续?

        var getBlock = new TransformBlock<string, List<Something>>(
            async i =>
            {
                var c = await apiHelper.Get(i);
                return c;
            }, new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
            });

        foreach (var r in results)
        {
            r.SomeList = getBlock.Post(r.Id);  // ERROR: Can't convert boolean to list.
        }

        getBlock.Complete();

这是一个使用 ActionBlock Class in the TPL dataflow 库的示例。

基本上是给你平行的,而且async而且还比较容易理解

数据流示例

public static async Task DoWorkLoads(List<Something> results)
{
   var options = new ExecutionDataflowBlockOptions
                     {
                        MaxDegreeOfParallelism = 50
                     };

   var block = new ActionBlock<Something>(MyMethodAsync, options);

   foreach (var result in results)
      block.Post(result );

   block.Complete();
   await block.Completion;

}

...

public async Task MyMethodAsync(Something result)
{       
   result.SomeList = await apiHelper.Get(result.Id);
}

当然要查错,加胡椒粉和盐调味

此外,它假设 apiHelper 是线程安全的

或许可以考虑改用微软的 Reactive Framework。

代码如下:

var query =
    from r in results.ToObservable()
    from l in Observable.FromAsync(() => apiHelper.Get(r.Id))
    select new { r, l };

query
    .Subscribe(x => x.r.SomeList = x.l);

完成。并行和异步。

只需 NuGet "System.Reactive" 并添加 using System.Reactiive.Linq;.

要异步和并行调用 api,您不需要 Reactive 或 Dataflow。与您所拥有的唯一复杂的是,您通过将 api 调用的结果设置为 属性 来改变对象 r 。不过,你想要的很简单:

这个:

foreach (var r in results)
{
    r.SomeList = await apiHelper.Get(r.Id);
}

变成:

var tasks = results.Select(async r => { r.SomeList = await apiHelper.Get(r.Id); });
await Task.WhenAll(tasks);

假设 apiHelper.Get 实际上是非阻塞和异步的,那么 results 中的每个项目都会对 api.

进行异步和并行调用