如何使用 BatchBlock 正确完成分支流水线?

How to complete correctly a branched pipeline with BatchBlock?

考虑以下管道:

public static void Main()
{
    var firstBlock = new TransformBlock<string, string>(s =>
    {
         Console.WriteLine("FirstBlock");
         return s + "-FirstBlock";
    });
    var batchBlock = new BatchBlock<string>(100, new GroupingDataflowBlockOptions { Greedy = true });
    var afterBatchBlock = new TransformManyBlock<string[], string>(strings =>
    {
        Console.WriteLine("AfterBatchBlock");
        return new[] { strings[0] + "-AfterBatchBlock" };
    });
    var lastBlock = new ActionBlock<string>(s =>
    {
        Console.WriteLine($"LastBlock {s}");
    });
    firstBlock.LinkTo(batchBlock, new DataflowLinkOptions { PropagateCompletion = true }, x => x.Contains("0"));
    batchBlock.LinkTo(afterBatchBlock, new DataflowLinkOptions { PropagateCompletion = true });
        
    afterBatchBlock.LinkTo(lastBlock, new DataflowLinkOptions { PropagateCompletion = true });
        firstBlock.LinkTo(lastBlock, new DataflowLinkOptions { PropagateCompletion = true });

    firstBlock.Post("0");

    firstBlock.Complete();
    firstBlock.Completion.GetAwaiter().GetResult();
    batchBlock.Completion.GetAwaiter().GetResult();
    afterBatchBlock.Completion.GetAwaiter().GetResult();
    lastBlock.Completion.GetAwaiter().GetResult();
}

运行 此代码被以下输出阻止:

FirstBlock
AfterBatchBlock

我认为幕后发生的事情如下:

我的问题是:

  1. 这是一个错误吗?
  2. 假设这是设计使然,建议的方法是什么来克服我的管道达到的不良状态

不,这不是错误。这是预期的行为。您必须首先意识到您不是在处理简单、直接的数据流管道,而是在处理数据流网格。它不是一个复杂的网格,但仍然不是我所说的管道。处理形成网格的数据流块的完成可能很棘手。在您的例子中,您有一个块 lastBlock,它是两个源块 firstBlockafterBatchBlock 的目标。当两个个源块都完成时,这个块应该完成,而不是其中一个。因此,在将此块与其源链接时不能使用 PropagateCompletion = true 选项。

没有内置的 API 可以做到这一点,但是实现二对一的完成传播器并不是很困难。事实上,您可以复制粘贴 中的 PropagateCompletion 方法,然后像这样使用它:

afterBatchBlock.LinkTo(lastBlock);
firstBlock.LinkTo(lastBlock);

PropagateCompletion(new IDataflowBlock[] { afterBatchBlock, firstBlock },
    new[] { lastBlock });