如何等待我的自定义数据流块完成?
How can I await my custom data flow block completion?
我正在使用 ITargetBlock,并且已经成功地创建了一个自定义数据流块。但是,我不能全神贯注于实施 "Completion" 任务。我目前将其定义为:
public Task Completion { get; }
建立数据流线后,我会尝试等待此完成任务,但会引用一个 "null" 对象。这是因为我没有将完成任务分配给任何东西。
我应该将其分配给什么?我认为它会自动与数据流块相关联,但它默认为无...
我应该将 Completion 分配给什么?
当您实施代码时,一旦您调用 Complete()
并且所有 "in flight" 工作完成,Completion
应该完成。
这可以通过 TaskCompletionSource 来完成
public Example<TInput>() : ITargetBlock
{
private TaskCompletionSource<Object> tcs = new TaskCompletionSource<Object>()
public Task Completion { get; }
public Example()
{
Completion = tcs.Task;
}
public void Complete()
{
// We run this on a background thread because we don't want the call to Complete be blocking.
Task.Run(() => {
// Wait here for any currently executing async work your dataflow block does to finish.
// ...
tcs.TrySetResult(null);
}
}
public void Fault (Exception exception)
{
// Cancel here any running work.
// ...
tcs.TrySetException(exception);
}
public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock<in TInput> source, bool consumeToAccept)
{
if (Completion.IsCompleted)
{
return DataflowMessageStatus.DecliningPermanently;
}
// ...
}
}
我正在使用 ITargetBlock,并且已经成功地创建了一个自定义数据流块。但是,我不能全神贯注于实施 "Completion" 任务。我目前将其定义为:
public Task Completion { get; }
建立数据流线后,我会尝试等待此完成任务,但会引用一个 "null" 对象。这是因为我没有将完成任务分配给任何东西。
我应该将其分配给什么?我认为它会自动与数据流块相关联,但它默认为无...
我应该将 Completion 分配给什么?
当您实施代码时,一旦您调用 Complete()
并且所有 "in flight" 工作完成,Completion
应该完成。
这可以通过 TaskCompletionSource 来完成
public Example<TInput>() : ITargetBlock
{
private TaskCompletionSource<Object> tcs = new TaskCompletionSource<Object>()
public Task Completion { get; }
public Example()
{
Completion = tcs.Task;
}
public void Complete()
{
// We run this on a background thread because we don't want the call to Complete be blocking.
Task.Run(() => {
// Wait here for any currently executing async work your dataflow block does to finish.
// ...
tcs.TrySetResult(null);
}
}
public void Fault (Exception exception)
{
// Cancel here any running work.
// ...
tcs.TrySetException(exception);
}
public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock<in TInput> source, bool consumeToAccept)
{
if (Completion.IsCompleted)
{
return DataflowMessageStatus.DecliningPermanently;
}
// ...
}
}