C# 使用 DataflowBlock.Completion 取消消费者任务而不是 CancellationToken
C# using DataflowBlock.Completion to cancel consumer tasks instead of CancellationToken
我想知道是否有一种巧妙的方法可以让 IDataflowBlock.Completion
取代对 ReceiveAsync
使用取消令牌的需要,或者使用 BufferBlock
或另一个 IDataflowBlock
.
IDataflowBlock.ReceiveAsync<T>(TimeSpan, CancellationToken)
如果 InputQueue
是 BufferBlock
:
BufferBlock<String> InputQueue
for (int i = 0; i < 26; i++)
{
await InputQueue.SendAsync(((char)(97 + i)).ToString());
}
如果InputQueue.Complete();
已经被调用,那么当队列被清空并且IDataflowBlock.Completion
会变为状态RanToCompletion,
可以用 IDataflowBlock.Completion.IsCompleted
.
检查
如果多个线程正在从队列中获取数据,这可能会在 InputQueue.ReceiveAsync
期间发生,是否有比
更简洁的方法来处理 InputQueue
完成
try
{
String parcel = await InputQueue.ReceiveAsync(timeSpan);
}
catch(InvalidOperationException x)
{
}
cancel a Dataflow Block 的最简单方法是向块的构造函数提供令牌,如下所示:
new ExecutionDataflowBlockOptions
{
CancellationToken = cancellationSource.Token
});
CancellationToken
is defined in DataflowBlockOptions
class,所以连BufferBlock
都可以取消。
为什么要自己实现 Receive
逻辑?使用 PropagateCompletion
with linking your blocks 是否有一些限制?例如,如果您的代码如下所示:
internal void HandleMessage()
{
try
{
var parcel = await InputQueue.ReceiveAsync(timeSpan);
// handle parsel
}
catch(InvalidOperationException x)
{
}
}
那么你可以像这样使用 ActionBlock
:
var InputQueue = new BufferBlock<string>();
var Handler = new ActionBlock<string>(parcel =>
{
// handle parsel
});
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
InputQueue.LinkTo(Handler, linkOptions);
// now after you call Complete method for InputQueue the completion will be propagated to your Handler block:
for (int i = 0; i < 26; i++)
{
await InputQueue.SendAsync(((char)(97 + i)).ToString());
}
InputQueue.Complete();
await Handler.Completion;
另请注意,如果您需要与 UI 进行一些交互,您可以将最后一个块用作 IObservable
with Rx.Net
库。
我想知道是否有一种巧妙的方法可以让 IDataflowBlock.Completion
取代对 ReceiveAsync
使用取消令牌的需要,或者使用 BufferBlock
或另一个 IDataflowBlock
.
IDataflowBlock.ReceiveAsync<T>(TimeSpan, CancellationToken)
如果 InputQueue
是 BufferBlock
:
BufferBlock<String> InputQueue
for (int i = 0; i < 26; i++)
{
await InputQueue.SendAsync(((char)(97 + i)).ToString());
}
如果InputQueue.Complete();
已经被调用,那么当队列被清空并且IDataflowBlock.Completion
会变为状态RanToCompletion,
可以用 IDataflowBlock.Completion.IsCompleted
.
如果多个线程正在从队列中获取数据,这可能会在 InputQueue.ReceiveAsync
期间发生,是否有比
InputQueue
完成
try
{
String parcel = await InputQueue.ReceiveAsync(timeSpan);
}
catch(InvalidOperationException x)
{
}
cancel a Dataflow Block 的最简单方法是向块的构造函数提供令牌,如下所示:
new ExecutionDataflowBlockOptions
{
CancellationToken = cancellationSource.Token
});
CancellationToken
is defined in DataflowBlockOptions
class,所以连BufferBlock
都可以取消。
为什么要自己实现 Receive
逻辑?使用 PropagateCompletion
with linking your blocks 是否有一些限制?例如,如果您的代码如下所示:
internal void HandleMessage()
{
try
{
var parcel = await InputQueue.ReceiveAsync(timeSpan);
// handle parsel
}
catch(InvalidOperationException x)
{
}
}
那么你可以像这样使用 ActionBlock
:
var InputQueue = new BufferBlock<string>();
var Handler = new ActionBlock<string>(parcel =>
{
// handle parsel
});
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
InputQueue.LinkTo(Handler, linkOptions);
// now after you call Complete method for InputQueue the completion will be propagated to your Handler block:
for (int i = 0; i < 26; i++)
{
await InputQueue.SendAsync(((char)(97 + i)).ToString());
}
InputQueue.Complete();
await Handler.Completion;
另请注意,如果您需要与 UI 进行一些交互,您可以将最后一个块用作 IObservable
with Rx.Net
库。