如何使用 TPL 数据流处理完整的输入列表?
How to process a complete list of inputs with TPL Dataflow?
我是 TPL 数据流的新手,我可以使用它,但我不确定我是否正确使用它。我有一个输入(字符串)列表,我想以最大程度的并行处理它们(全部)并知道它何时全部完成。现在我只是 foreach
通过输入并在 ActionBlock
上调用 Post
,忽略 return 值。这似乎不正确,因为它可能会错过输入。
我的问题是:如何避免丢失物品?是否有一个内置块,我可以向其提供输入并确保它们都被尝试过? (无论每个输入 success/failure。)
我看到的建议基本是:
await block.Completion;
这是否考虑了失败的输入(其中 Post
或 SendAsync
会 return 错误)?对我来说奇怪的是,这个决定似乎是在我调用 Post
时而不是之后做出的,所以这个 Completion
甚至不包括那些项目。
我觉得我基本上需要一个重试循环来处理之前无法处理的输入,类似于:
while (items.Count > 0) {
foreach (var item in items) {
if (await block.SendAsync(item)) {
items.Remove(item);
}
}
await block.Completion;
}
block.Complete();
(除了更好的循环 handling/error 检查。)
这个额外的级别是不必要的吗?还是我在概念上哪里错了?
This seems incorrect since it could miss inputs.
假设您使用的是默认值,这是正确的。 Post
只有 returns false
如果块拒绝输入。如果模块收到 Complete
信号,或者模块的输入缓冲区已满,就会发生这种情况。默认情况下,每个块的输入缓冲区可以无限增长,因此具有默认输入缓冲区大小的 ActionBlock
在调用 Complete
之后只会从 Post
return false
.
ActionBlock
最常见的用例是使用 unlimited bounded capacity 并且代码仅在添加所有项目后才调用 Complete
。在这种情况下,Post
永远不会 return false
并且您可以安全地忽略 return 值。
如果块已完成,或者块的输入缓冲区已满,方法 Post
将 return false。由于设置 BoundedCapacity
不是什么奇特的东西,并且在项目的后期很可能需要解决高 RAM 使用率的新问题,我认为使用 [=13= 不是一个安全的选择] 方法并简单地忽略结果。为了防止出现涉及丢失消息(可能是订单或发票)的无趣错误,您可以这样做:
foreach (var item in items)
{
var accepted = block.Post(item);
if (!accepted) throw new InvalidOperationException("Item was not accepted");
}
这样您至少会在出现问题时得到通知,并且不会让错误行为悄悄出现。
另一方面,等待 SendAsync
并忽略结果要安全得多。 SendAsync
通常会 return false
在发生异常或取消的情况下,在这种情况下,您会在 await
收到通知Completion
块。所以在这种情况下不需要抛出异常。
foreach (var item in items)
{
await block.SendAsync(item).ConfigureAwait(false);
}
出于性能原因,您可以同时使用 Post
和 SendAsync
。只有当您有数千万个项目要处理时,这才会有所作为。
foreach (var item in items)
{
if (!block.Post(item))
{
await block.SendAsync(item).ConfigureAwait(false);
}
}
我是 TPL 数据流的新手,我可以使用它,但我不确定我是否正确使用它。我有一个输入(字符串)列表,我想以最大程度的并行处理它们(全部)并知道它何时全部完成。现在我只是 foreach
通过输入并在 ActionBlock
上调用 Post
,忽略 return 值。这似乎不正确,因为它可能会错过输入。
我的问题是:如何避免丢失物品?是否有一个内置块,我可以向其提供输入并确保它们都被尝试过? (无论每个输入 success/failure。)
我看到的建议基本是:
await block.Completion;
这是否考虑了失败的输入(其中 Post
或 SendAsync
会 return 错误)?对我来说奇怪的是,这个决定似乎是在我调用 Post
时而不是之后做出的,所以这个 Completion
甚至不包括那些项目。
我觉得我基本上需要一个重试循环来处理之前无法处理的输入,类似于:
while (items.Count > 0) {
foreach (var item in items) {
if (await block.SendAsync(item)) {
items.Remove(item);
}
}
await block.Completion;
}
block.Complete();
(除了更好的循环 handling/error 检查。)
这个额外的级别是不必要的吗?还是我在概念上哪里错了?
This seems incorrect since it could miss inputs.
假设您使用的是默认值,这是正确的。 Post
只有 returns false
如果块拒绝输入。如果模块收到 Complete
信号,或者模块的输入缓冲区已满,就会发生这种情况。默认情况下,每个块的输入缓冲区可以无限增长,因此具有默认输入缓冲区大小的 ActionBlock
在调用 Complete
之后只会从 Post
return false
.
ActionBlock
最常见的用例是使用 unlimited bounded capacity 并且代码仅在添加所有项目后才调用 Complete
。在这种情况下,Post
永远不会 return false
并且您可以安全地忽略 return 值。
如果块已完成,或者块的输入缓冲区已满,方法 Post
将 return false。由于设置 BoundedCapacity
不是什么奇特的东西,并且在项目的后期很可能需要解决高 RAM 使用率的新问题,我认为使用 [=13= 不是一个安全的选择] 方法并简单地忽略结果。为了防止出现涉及丢失消息(可能是订单或发票)的无趣错误,您可以这样做:
foreach (var item in items)
{
var accepted = block.Post(item);
if (!accepted) throw new InvalidOperationException("Item was not accepted");
}
这样您至少会在出现问题时得到通知,并且不会让错误行为悄悄出现。
另一方面,等待 SendAsync
并忽略结果要安全得多。 SendAsync
通常会 return false
在发生异常或取消的情况下,在这种情况下,您会在 await
收到通知Completion
块。所以在这种情况下不需要抛出异常。
foreach (var item in items)
{
await block.SendAsync(item).ConfigureAwait(false);
}
出于性能原因,您可以同时使用 Post
和 SendAsync
。只有当您有数千万个项目要处理时,这才会有所作为。
foreach (var item in items)
{
if (!block.Post(item))
{
await block.SendAsync(item).ConfigureAwait(false);
}
}