C# 使用 DataflowBlock.Completion 取消消费者任务而不是 CancellationToken

C# using DataflowBlock.Completion to cancel consumer tasks instead of CancellationToken

我想知道是否有一种巧妙的方法可以让 IDataflowBlock.Completion 取代对 ReceiveAsync 使用取消令牌的需要,或者使用 BufferBlock 或另一个 IDataflowBlock.

IDataflowBlock.ReceiveAsync<T>(TimeSpan, CancellationToken)

如果 InputQueueBufferBlock:

BufferBlock<String> InputQueue 

for (int i = 0; i < 26; i++)            
{
    await InputQueue.SendAsync(((char)(97 + i)).ToString());
}

如果InputQueue.Complete();已经被调用,那么当队列被清空并且IDataflowBlock.Completion会变为状态RanToCompletion, 可以用 IDataflowBlock.Completion.IsCompleted.

检查

如果多个线程正在从队列中获取数据,这可能会在 InputQueue.ReceiveAsync 期间发生,是否有比

更简洁的方法来处理 InputQueue 完成
try
{
    String parcel = await InputQueue.ReceiveAsync(timeSpan);
}
catch(InvalidOperationException x)
{

}

cancel a Dataflow Block 的最简单方法是向块的构造函数提供令牌,如下所示:

new ExecutionDataflowBlockOptions
{
    CancellationToken = cancellationSource.Token
});

CancellationToken is defined in Dataflow​Block​Options class,所以连BufferBlock都可以取消。

为什么要自己实现 Receive 逻辑?使用 PropagateCompletion with linking your blocks 是否有一些限制?例如,如果您的代码如下所示:

internal void HandleMessage()
{
    try
    {
        var parcel = await InputQueue.ReceiveAsync(timeSpan);
        // handle parsel
    }
    catch(InvalidOperationException x)
    {
    }
}

那么你可以像这样使用 ActionBlock:

var InputQueue = new BufferBlock<string>();
var Handler = new ActionBlock<string>(parcel =>
{
    // handle parsel
});
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
InputQueue.LinkTo(Handler, linkOptions);

// now after you call Complete method for InputQueue the completion will be propagated to your Handler block:
for (int i = 0; i < 26; i++)            
{
    await InputQueue.SendAsync(((char)(97 + i)).ToString());
}
InputQueue.Complete();
await Handler.Completion;

另请注意,如果您需要与 UI 进行一些交互,您可以将最后一个块用作 IObservable with Rx.Net 库。