了解 TPL 数据流、块和延续任务

Understanding TPL Dataflow, Blocks, and Continuation Tasks

我正在开展一个项目,该项目的需求非常适合 TPL 数据流。我对它的经验相对有限(而且我所做的是前段时间所做的),我一直在通过阅读微软的文档以及我可以在网上找到的文章来复习它。

完成后,我构建了我的代码以将一系列块链接在一起(主要是 TransformBlock 并以 ActionBlock 结束,执行如下操作:

var block1 = new TransformBlock<T, U>(async input => {});
var block2 = new TransformBlock<U, V>(async input => {});
var block3 = new ActionBlock<V>(async input => {});

block1.LinkTo(block2);
block2.LinkTo(block3);

foreach(var item in items)
{
   await block1.SendAsync(item);
}
block1.Complete();
await block3.Completion;

有人在一篇文章中(我找不到)建议管道中应该有继续任务来将块标记为完成。这是他们为此提供的代码。

// Create the continuation tasks in the pipeline that marks each block as complete.
await block1.Completion.ContinueWith(t =>
{
    if (t.IsFaulted) { ((IDataflowBlock)block2).Fault(t.Exception); }
    else { block2.Complete(); }
});
await block2.Completion.ContinueWith(t =>
{
    if (t.IsFaulted) { ((IDataflowBlock)block3).Fault(t.Exception); }
    else { block3.Complete(); }
});

我承认我并不完全理解这段代码在做什么,甚至是否需要它。当我在刚写的代码中尝试 运行 时,代码挂在第一个 ContinueWith 上,它永远不会进入 运行 管道。

我希望得到额外的解释,因为我想更好地理解这里发生的事情的细微差别。

对于线性管道,您需要做的就是PropagateCompletion。该选项传播 Completion 以及 Faults,然后将其异常附加到最终的 Completion Task:

var linkOptions = new DataflowLinkOptions() { PropagateCompletion = true };
block1.LinkTo(block2, linkOptions);
block2.LinkTo(block3, linkOptions);

不需要继续。但是如果你有分布到多个块的管道,你需要自己处理完成和错误传播 .

the code hangs on the first ContinueWith

发生这种情况是因为您 await 继续,所以如果您在调用 Complete() 之前这样做,那么 block1 将永远不会完成。