如何使用 TPL 数据流处理完整的输入列表?

How to process a complete list of inputs with TPL Dataflow?

我是 TPL 数据流的新手,我可以使用它,但我不确定我是否正确使用它。我有一个输入(字符串)列表,我想以最大程度的并行处理它们(全部)并知道它何时全部完成。现在我只是 foreach 通过输入并在 ActionBlock 上调用 Post,忽略 return 值。这似乎不正确,因为它可能会错过输入。

我的问题是:如何避免丢失物品?是否有一个内置块,我可以向其提供输入并确保它们都被尝试过? (无论每个输入 success/failure。)

我看到的建议基本是:

await block.Completion;

这是否考虑了失败的输入(其中 PostSendAsync 会 return 错误)?对我来说奇怪的是,这个决定似乎是在我调用 Post 时而不是之后做出的,所以这个 Completion 甚至不包括那些项目。

我觉得我基本上需要一个重试循环来处理之前无法处理的输入,类似于:

while (items.Count > 0) {
  foreach (var item in items) {
    if (await block.SendAsync(item)) {
      items.Remove(item);
    }
  }

  await block.Completion;
}

block.Complete();

(除了更好的循环 handling/error 检查。)

这个额外的级别是不必要的吗?还是我在概念上哪里错了?

This seems incorrect since it could miss inputs.

假设您使用的是默认值,这是正确的。 Post 只有 returns false 如果块拒绝输入。如果模块收到 Complete 信号,或者模块的输入缓冲区已满,就会发生这种情况。默认情况下,每个块的输入缓冲区可以无限增长,因此具有默认输入缓冲区大小的 ActionBlock 在调用 Complete 之后只会从 Post return false .

ActionBlock 最常见的用例是使用 unlimited bounded capacity 并且代码仅在添加所有项目后才调用 Complete。在这种情况下,Post 永远不会 return false 并且您可以安全地忽略 return 值。

如果块已完成,或者块的输入缓冲区已满,方法 Post 将 return false。由于设置 BoundedCapacity 不是什么奇特的东西,并且在项目的后期很可能需要解决高 RAM 使用率的新问题,我认为使用 [=13= 不是一个安全的选择] 方法并简单地忽略结果。为了防止出现涉及丢失消息(可能是订单或发票)的无趣错误,您可以这样做:

foreach (var item in items)
{
    var accepted = block.Post(item);
    if (!accepted) throw new InvalidOperationException("Item was not accepted");
}

这样您至少会在出现问题时得到通知,并且不会让错误行为悄悄出现。

另一方面,等待 SendAsync 并忽略结果要安全得多。 SendAsync 通常会 return false 在发生异常或取消的情况下,在这种情况下,您会在 await 收到通知Completion 块。所以在这种情况下不需要抛出异常。

foreach (var item in items)
{
    await block.SendAsync(item).ConfigureAwait(false);
}

出于性能原因,您可以同时使用 PostSendAsync。只有当您有数千万个项目要处理时,这才会有所作为。

foreach (var item in items)
{
    if (!block.Post(item))
    {
        await block.SendAsync(item).ConfigureAwait(false);
    }
}