创建将在任何失败时完成的 ActionBlock<T> 列表
Create list of ActionBlock<T> that will complete when any fail
在可以对 'empty' 任务列表调用 await 的情况下。
如何等待 Task<T>
列表,然后将新任务添加到等待列表,直到其中一个任务失败或完成。
我相信这个问题一定有 Awaiter
or CancellationTokenSource
解决方案。
public class LinkerThingBob
{
private List<Task> ofmyactions = new List<Task>();
public void LinkTo<T>(BufferBlock<T> messages) where T : class
{
var action = new ActionBlock<IMsg>(_ => this.Tx(messages, _));
// this would not actually work, because the WhenAny
// will not include subsequent actions.
ofmyactions.Add(action.Completion);
// link the new action block.
this._inboundMessageBuffer.LinkTo(block);
}
// used to catch exceptions since these blocks typically don't end.
public async Task CompletionAsync()
{
// how do i make the awaiting thread add a new action
// to the list of waiting tasks without interrupting it
// or graciously interrupting it to let it know there's one more
// more importantly, this CompletionAsync might actually be called
// before the first action is added to the list, so I actually need
// WhenAny(INFINITE + ofmyactions)
await Task.WhenAny(ofmyactions);
}
}
我的问题是我需要一种机制,我可以将上面创建的每个 action
实例添加到一个 Task<T>
中,该实例将在出现异常时完成。
我不确定如何最好地解释这一点,但是:
任务必须至少调用一次 LinkTo<T>
才能完成,所以我需要从无限任务开始
每次调用 LinkTo<T>
时,必须将新操作添加到任务列表中,该任务可能已在另一个线程中等待。
没有任何内置的东西,但使用 TaskCompletionSource<T>
构建一个并不难。 TCS 是您想要等待某事但尚无构造时使用的类型。 (自定义等待者用于更高级的场景)。
在这种情况下,像这样的东西就足够了:
public class LinkerThingBob
{
private readonly TaskCompletionSource<object> _tcs = new TaskCompletionSource<object>();
private async Task ObserveAsync(Task task)
{
try
{
await task;
_tcs.TrySetResult(null);
}
catch (Exception ex)
{
_tcs.TrySetException(ex);
}
}
public void LinkTo<T>(BufferBlock<T> messages) where T : class
{
var action = new ActionBlock<IMsg>(_ => this.Tx(messages, _));
var _ = ObserveAsync(action.Completion);
this._inboundMessageBuffer.LinkTo(block);
}
public Task Completion { get { return _tcs.Task; } }
}
Completion
以未完成状态开始。使用 ObserveAsync
可以链接任意数量的块。一旦其中一个块完成,Completion
也会完成。我在这里写 ObserveAsync
的方式是,如果第一个完成的块没有错误地完成,那么 Completion
也会如此;如果第一个完成的块以异常完成,则 Completion
将以相同的异常完成。随意调整您的特定需求。 :)
这是一个专门使用 TPL 数据流库本身工具的解决方案。您可以创建一个 TransformBlock
来“处理”您想要观察的 ActionBlock
。处理一个块意味着简单地等待它的完成。所以 TransformBlock
接受不完整的块,并输出与完成的块相同的块。 TransformBlock
必须配置无限并行和容量,并禁用排序,以便同时观察所有块,并立即返回每个完成的块。
var allBlocks = new TransformBlock<ActionBlock<IMsg>, ActionBlock<IMsg>>(async block =>
{
try { await block.Completion; }
catch { }
return block;
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
EnsureOrdered = false
});
然后在 LinkerThingBob.LinkTo
方法中,将创建的 ActionBlock
发送到 TransformBlock
.
var actionBlock = new ActionBlock<IMsg>(_ => this.Tx(messages, _));
allBlocks.Post(actionBlock);
现在你需要一个目标来接收第一个故障块。 WriteOnceBlock
非常适合这个角色,因为它确保最多接收一个故障块。
var firstFaulted = new WriteOnceBlock<ActionBlock<IMsg>>(x => x);
allBlocks.LinkTo(firstFaulted, block => block.Completion.IsFaulted);
终于可以await
在任何地方完成WriteOnceBlock
了。它会在收到故障块后立即完成,或者如果它从未收到故障块,它可能永远不会完成。
await firstFaulted.Completion;
等待之后,如果需要,您还可以获取故障块。
ActionBlock<IMsg> faultedBlock = firstFaulted.Receive();
WriteOnceBlock
在转发消息时的行为方式很特殊。与大多数其他块不同,您可以多次调用它的 Receive
方法,并且您总是会得到它包含的相同的单个项目(在第一个 Receive
之后它不会从缓冲区中删除)。
在可以对 'empty' 任务列表调用 await 的情况下。
如何等待 Task<T>
列表,然后将新任务添加到等待列表,直到其中一个任务失败或完成。
我相信这个问题一定有 Awaiter
or CancellationTokenSource
解决方案。
public class LinkerThingBob
{
private List<Task> ofmyactions = new List<Task>();
public void LinkTo<T>(BufferBlock<T> messages) where T : class
{
var action = new ActionBlock<IMsg>(_ => this.Tx(messages, _));
// this would not actually work, because the WhenAny
// will not include subsequent actions.
ofmyactions.Add(action.Completion);
// link the new action block.
this._inboundMessageBuffer.LinkTo(block);
}
// used to catch exceptions since these blocks typically don't end.
public async Task CompletionAsync()
{
// how do i make the awaiting thread add a new action
// to the list of waiting tasks without interrupting it
// or graciously interrupting it to let it know there's one more
// more importantly, this CompletionAsync might actually be called
// before the first action is added to the list, so I actually need
// WhenAny(INFINITE + ofmyactions)
await Task.WhenAny(ofmyactions);
}
}
我的问题是我需要一种机制,我可以将上面创建的每个 action
实例添加到一个 Task<T>
中,该实例将在出现异常时完成。
我不确定如何最好地解释这一点,但是:
任务必须至少调用一次
LinkTo<T>
才能完成,所以我需要从无限任务开始每次调用
LinkTo<T>
时,必须将新操作添加到任务列表中,该任务可能已在另一个线程中等待。
没有任何内置的东西,但使用 TaskCompletionSource<T>
构建一个并不难。 TCS 是您想要等待某事但尚无构造时使用的类型。 (自定义等待者用于更高级的场景)。
在这种情况下,像这样的东西就足够了:
public class LinkerThingBob
{
private readonly TaskCompletionSource<object> _tcs = new TaskCompletionSource<object>();
private async Task ObserveAsync(Task task)
{
try
{
await task;
_tcs.TrySetResult(null);
}
catch (Exception ex)
{
_tcs.TrySetException(ex);
}
}
public void LinkTo<T>(BufferBlock<T> messages) where T : class
{
var action = new ActionBlock<IMsg>(_ => this.Tx(messages, _));
var _ = ObserveAsync(action.Completion);
this._inboundMessageBuffer.LinkTo(block);
}
public Task Completion { get { return _tcs.Task; } }
}
Completion
以未完成状态开始。使用 ObserveAsync
可以链接任意数量的块。一旦其中一个块完成,Completion
也会完成。我在这里写 ObserveAsync
的方式是,如果第一个完成的块没有错误地完成,那么 Completion
也会如此;如果第一个完成的块以异常完成,则 Completion
将以相同的异常完成。随意调整您的特定需求。 :)
这是一个专门使用 TPL 数据流库本身工具的解决方案。您可以创建一个 TransformBlock
来“处理”您想要观察的 ActionBlock
。处理一个块意味着简单地等待它的完成。所以 TransformBlock
接受不完整的块,并输出与完成的块相同的块。 TransformBlock
必须配置无限并行和容量,并禁用排序,以便同时观察所有块,并立即返回每个完成的块。
var allBlocks = new TransformBlock<ActionBlock<IMsg>, ActionBlock<IMsg>>(async block =>
{
try { await block.Completion; }
catch { }
return block;
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
EnsureOrdered = false
});
然后在 LinkerThingBob.LinkTo
方法中,将创建的 ActionBlock
发送到 TransformBlock
.
var actionBlock = new ActionBlock<IMsg>(_ => this.Tx(messages, _));
allBlocks.Post(actionBlock);
现在你需要一个目标来接收第一个故障块。 WriteOnceBlock
非常适合这个角色,因为它确保最多接收一个故障块。
var firstFaulted = new WriteOnceBlock<ActionBlock<IMsg>>(x => x);
allBlocks.LinkTo(firstFaulted, block => block.Completion.IsFaulted);
终于可以await
在任何地方完成WriteOnceBlock
了。它会在收到故障块后立即完成,或者如果它从未收到故障块,它可能永远不会完成。
await firstFaulted.Completion;
等待之后,如果需要,您还可以获取故障块。
ActionBlock<IMsg> faultedBlock = firstFaulted.Receive();
WriteOnceBlock
在转发消息时的行为方式很特殊。与大多数其他块不同,您可以多次调用它的 Receive
方法,并且您总是会得到它包含的相同的单个项目(在第一个 Receive
之后它不会从缓冲区中删除)。