如何使用 BatchBlock 正确完成分支流水线?
How to complete correctly a branched pipeline with BatchBlock?
考虑以下管道:
public static void Main()
{
var firstBlock = new TransformBlock<string, string>(s =>
{
Console.WriteLine("FirstBlock");
return s + "-FirstBlock";
});
var batchBlock = new BatchBlock<string>(100, new GroupingDataflowBlockOptions { Greedy = true });
var afterBatchBlock = new TransformManyBlock<string[], string>(strings =>
{
Console.WriteLine("AfterBatchBlock");
return new[] { strings[0] + "-AfterBatchBlock" };
});
var lastBlock = new ActionBlock<string>(s =>
{
Console.WriteLine($"LastBlock {s}");
});
firstBlock.LinkTo(batchBlock, new DataflowLinkOptions { PropagateCompletion = true }, x => x.Contains("0"));
batchBlock.LinkTo(afterBatchBlock, new DataflowLinkOptions { PropagateCompletion = true });
afterBatchBlock.LinkTo(lastBlock, new DataflowLinkOptions { PropagateCompletion = true });
firstBlock.LinkTo(lastBlock, new DataflowLinkOptions { PropagateCompletion = true });
firstBlock.Post("0");
firstBlock.Complete();
firstBlock.Completion.GetAwaiter().GetResult();
batchBlock.Completion.GetAwaiter().GetResult();
afterBatchBlock.Completion.GetAwaiter().GetResult();
lastBlock.Completion.GetAwaiter().GetResult();
}
运行 此代码被以下输出阻止:
FirstBlock
AfterBatchBlock
我认为幕后发生的事情如下:
- FirstBlock 向其所有链接目标(BatchBlock、AfterBatchBlock、LastBlock)发送完成信号
- BatchBlock 和 LastBlock 完成
- AfterBatchBlock 然后尝试将其输出传递给 LastBlock,但它已经完成并且永远卡住了。
我的问题是:
- 这是一个错误吗?
- 假设这是设计使然,建议的方法是什么来克服我的管道达到的不良状态
不,这不是错误。这是预期的行为。您必须首先意识到您不是在处理简单、直接的数据流管道,而是在处理数据流网格。它不是一个复杂的网格,但仍然不是我所说的管道。处理形成网格的数据流块的完成可能很棘手。在您的例子中,您有一个块 lastBlock
,它是两个源块 firstBlock
和 afterBatchBlock
的目标。当两个个源块都完成时,这个块应该完成,而不是其中一个。因此,在将此块与其源链接时不能使用 PropagateCompletion = true
选项。
没有内置的 API 可以做到这一点,但是实现二对一的完成传播器并不是很困难。事实上,您可以复制粘贴 中的 PropagateCompletion
方法,然后像这样使用它:
afterBatchBlock.LinkTo(lastBlock);
firstBlock.LinkTo(lastBlock);
PropagateCompletion(new IDataflowBlock[] { afterBatchBlock, firstBlock },
new[] { lastBlock });
考虑以下管道:
public static void Main()
{
var firstBlock = new TransformBlock<string, string>(s =>
{
Console.WriteLine("FirstBlock");
return s + "-FirstBlock";
});
var batchBlock = new BatchBlock<string>(100, new GroupingDataflowBlockOptions { Greedy = true });
var afterBatchBlock = new TransformManyBlock<string[], string>(strings =>
{
Console.WriteLine("AfterBatchBlock");
return new[] { strings[0] + "-AfterBatchBlock" };
});
var lastBlock = new ActionBlock<string>(s =>
{
Console.WriteLine($"LastBlock {s}");
});
firstBlock.LinkTo(batchBlock, new DataflowLinkOptions { PropagateCompletion = true }, x => x.Contains("0"));
batchBlock.LinkTo(afterBatchBlock, new DataflowLinkOptions { PropagateCompletion = true });
afterBatchBlock.LinkTo(lastBlock, new DataflowLinkOptions { PropagateCompletion = true });
firstBlock.LinkTo(lastBlock, new DataflowLinkOptions { PropagateCompletion = true });
firstBlock.Post("0");
firstBlock.Complete();
firstBlock.Completion.GetAwaiter().GetResult();
batchBlock.Completion.GetAwaiter().GetResult();
afterBatchBlock.Completion.GetAwaiter().GetResult();
lastBlock.Completion.GetAwaiter().GetResult();
}
运行 此代码被以下输出阻止:
FirstBlock
AfterBatchBlock
我认为幕后发生的事情如下:
- FirstBlock 向其所有链接目标(BatchBlock、AfterBatchBlock、LastBlock)发送完成信号
- BatchBlock 和 LastBlock 完成
- AfterBatchBlock 然后尝试将其输出传递给 LastBlock,但它已经完成并且永远卡住了。
我的问题是:
- 这是一个错误吗?
- 假设这是设计使然,建议的方法是什么来克服我的管道达到的不良状态
不,这不是错误。这是预期的行为。您必须首先意识到您不是在处理简单、直接的数据流管道,而是在处理数据流网格。它不是一个复杂的网格,但仍然不是我所说的管道。处理形成网格的数据流块的完成可能很棘手。在您的例子中,您有一个块 lastBlock
,它是两个源块 firstBlock
和 afterBatchBlock
的目标。当两个个源块都完成时,这个块应该完成,而不是其中一个。因此,在将此块与其源链接时不能使用 PropagateCompletion = true
选项。
没有内置的 API 可以做到这一点,但是实现二对一的完成传播器并不是很困难。事实上,您可以复制粘贴 PropagateCompletion
方法,然后像这样使用它:
afterBatchBlock.LinkTo(lastBlock);
firstBlock.LinkTo(lastBlock);
PropagateCompletion(new IDataflowBlock[] { afterBatchBlock, firstBlock },
new[] { lastBlock });