如何在 TransformBlock 之后写入对象?
How to write to an object after TransformBlock?
我有一个需要并行迭代的对象列表。这是我需要做的:
foreach (var r in results)
{
r.SomeList = await apiHelper.Get(r.Id);
}
因为我想将它并行化,所以我尝试使用 Parallel.ForEach() 但它不会等到一切都真正完成,因为
apiHelper.Get() 正在其内部进行等待。
Parallel.ForEach(
results,
async (r) =>
{
r.SomeList = await apiHelper.Get(r.Id);
});
所以我在网上搜索了一下,发现了这个:
Nesting await in Parallel.ForEach
现在我是 TPL 的新手(20 分钟前),我可能遗漏了一些明显的东西。我该如何继续?
var getBlock = new TransformBlock<string, List<Something>>(
async i =>
{
var c = await apiHelper.Get(i);
return c;
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
foreach (var r in results)
{
r.SomeList = getBlock.Post(r.Id); // ERROR: Can't convert boolean to list.
}
getBlock.Complete();
这是一个使用 ActionBlock Class in the TPL dataflow 库的示例。
基本上是给你平行的,而且async
而且还比较容易理解
数据流示例
public static async Task DoWorkLoads(List<Something> results)
{
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 50
};
var block = new ActionBlock<Something>(MyMethodAsync, options);
foreach (var result in results)
block.Post(result );
block.Complete();
await block.Completion;
}
...
public async Task MyMethodAsync(Something result)
{
result.SomeList = await apiHelper.Get(result.Id);
}
当然要查错,加胡椒粉和盐调味
此外,它假设 apiHelper
是线程安全的
或许可以考虑改用微软的 Reactive Framework。
代码如下:
var query =
from r in results.ToObservable()
from l in Observable.FromAsync(() => apiHelper.Get(r.Id))
select new { r, l };
query
.Subscribe(x => x.r.SomeList = x.l);
完成。并行和异步。
只需 NuGet "System.Reactive" 并添加 using System.Reactiive.Linq;
.
要异步和并行调用 api,您不需要 Reactive 或 Dataflow。与您所拥有的唯一复杂的是,您通过将 api 调用的结果设置为 属性 来改变对象 r
。不过,你想要的很简单:
这个:
foreach (var r in results)
{
r.SomeList = await apiHelper.Get(r.Id);
}
变成:
var tasks = results.Select(async r => { r.SomeList = await apiHelper.Get(r.Id); });
await Task.WhenAll(tasks);
假设 apiHelper.Get
实际上是非阻塞和异步的,那么 results
中的每个项目都会对 api.
进行异步和并行调用
我有一个需要并行迭代的对象列表。这是我需要做的:
foreach (var r in results)
{
r.SomeList = await apiHelper.Get(r.Id);
}
因为我想将它并行化,所以我尝试使用 Parallel.ForEach() 但它不会等到一切都真正完成,因为 apiHelper.Get() 正在其内部进行等待。
Parallel.ForEach(
results,
async (r) =>
{
r.SomeList = await apiHelper.Get(r.Id);
});
所以我在网上搜索了一下,发现了这个: Nesting await in Parallel.ForEach
现在我是 TPL 的新手(20 分钟前),我可能遗漏了一些明显的东西。我该如何继续?
var getBlock = new TransformBlock<string, List<Something>>(
async i =>
{
var c = await apiHelper.Get(i);
return c;
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
foreach (var r in results)
{
r.SomeList = getBlock.Post(r.Id); // ERROR: Can't convert boolean to list.
}
getBlock.Complete();
这是一个使用 ActionBlock Class in the TPL dataflow 库的示例。
基本上是给你平行的,而且async
而且还比较容易理解
数据流示例
public static async Task DoWorkLoads(List<Something> results)
{
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 50
};
var block = new ActionBlock<Something>(MyMethodAsync, options);
foreach (var result in results)
block.Post(result );
block.Complete();
await block.Completion;
}
...
public async Task MyMethodAsync(Something result)
{
result.SomeList = await apiHelper.Get(result.Id);
}
当然要查错,加胡椒粉和盐调味
此外,它假设 apiHelper
是线程安全的
或许可以考虑改用微软的 Reactive Framework。
代码如下:
var query =
from r in results.ToObservable()
from l in Observable.FromAsync(() => apiHelper.Get(r.Id))
select new { r, l };
query
.Subscribe(x => x.r.SomeList = x.l);
完成。并行和异步。
只需 NuGet "System.Reactive" 并添加 using System.Reactiive.Linq;
.
要异步和并行调用 api,您不需要 Reactive 或 Dataflow。与您所拥有的唯一复杂的是,您通过将 api 调用的结果设置为 属性 来改变对象 r
。不过,你想要的很简单:
这个:
foreach (var r in results)
{
r.SomeList = await apiHelper.Get(r.Id);
}
变成:
var tasks = results.Select(async r => { r.SomeList = await apiHelper.Get(r.Id); });
await Task.WhenAll(tasks);
假设 apiHelper.Get
实际上是非阻塞和异步的,那么 results
中的每个项目都会对 api.