传播完成

Propagating completion

我有一个非常基本的线性管道,我想在其中传播完成并等待一切完成:

static void Main(string[] args)
{
    ExecutePipeline().Wait();
}

static async Task ExecutePipeline()
{
    var addBlock = new TransformBlock<int, int>(x =>
    {
        var result = x + 2;
        Console.WriteLine(result);
        return result;
    });
    var subBlock = new TransformBlock<int, int>(x =>
    {
        var result = x - 2;
        Console.WriteLine(result);
        return result;
    });
    var mulBlock = new TransformBlock<int, int>(x =>
    {
        var result = x * 2;
        Console.WriteLine(result);
        return result;
    });
    var divBlock = new TransformBlock<int, int>(x =>
    {
        var result = x / 2;
        Console.WriteLine(result);
        return result;
    });

    var flowOptions = new DataflowLinkOptions { PropagateCompletion = true };
    addBlock.LinkTo(mulBlock, flowOptions);
    mulBlock.LinkTo(subBlock, flowOptions);
    subBlock.LinkTo(divBlock, flowOptions);

    addBlock.Post(4);
    addBlock.Complete();
    mulBlock.Complete();
    subBlock.Complete();
    await divBlock.Completion;
}

不幸的是,在当前状态下,只有 addBlock 的结果被打印出来,程序终止,而不是在终止前打印所有结果。

如果我注释掉所有在它们的块上调用 Complete() 的行,或者如果我不注释 addBlock.Complete(),我会在管道中打印出所有结果,但程序永远不会结束,因为不传播完成。但是,如果我取消阻止 mulBlock.Complete()subBlock.Complete(),与默认代码的行为方式类似,程序会打印出 addBlock 的结果并终止。

有趣的是,取消注释最后提到的这两个块中的任何一个或所有块都具有相同的行为,这让我质疑如果其中一个块被注释,完成将如何传播。显然,我在逻辑上遗漏了一些东西,但我无法弄清楚它是什么。我将如何完成打印所有结果的预期行为?

编辑:

所以,我终于在

找到了适合我的东西

看来我需要将最后一段代码简单地更改为:

addBlock.Post(4);
addBlock.Complete();
await addBlock.Completion;

原始代码不起作用,因为 Complete() 在数据传播之前在每个块上调用,所以这是一个竞争条件的情况。

但是,使用这个新编辑的代码,它会在 addBlock 上调用 Complete() 并等待其完成。这使程序按预期工作,但让我更加困惑。为什么 Completion 必须从 addBlock 等待,而不是从链中的最后一个区块 divBlock 等待?我认为 Completion() 仅在 addBlock 上调用,因为 PropagationCompletion 设置为 true,但我认为我们会等待最后一个块的完成,而不是第一个。

如果我等待 mulBlock 完成,那么只会打印 addBlock 的结果。如果我等待 subBlock 完成,则会打印 addBlockmulBlock 的结果。如果我等待完成 divBlock,则会打印 addBlockmulBlocksubBlock 的结果。

我的代码基于 Stephen Cleary 的 C# Cookbook 中的并发性 示例(第 4.1 节 链接块(第 48 页)):

var multiplyBlock = new TransformBlock<int, int>(item => item * 2);
var subtractBlock = new TransformBlock<int, int>(item => item - 2);

var options = new DataflowLinkOptions { PropagateCompletion = true };
multiplyBlock.LinkTo(subtractBlock, options);

...

// The first block's completion is automatically propagated to the second block.
multiplyBlock.Complete();
await subtractBlock.Completion;

当我设置 Cleary 的代码以匹配我的代码时,出现了相同的行为。程序打印结果并仅在我等待 multiplyBlock.Completion.

时终止

问题是一个块只有在清空其所有队列(包括输出队列)后才会完成。在您的情况下发生的情况是完成正确传播,但随后 divBlock 卡在 "almost complete" 模式,等待其输出队列中的项目被删除。

要解决此问题,您可以将 divBlock 更改为 ActionBlock,或者您可以将 link 更改为 DataflowBlock.NullTarget<int>()