创建将在任何失败时完成的 ActionBlock<T> 列表

Create list of ActionBlock<T> that will complete when any fail

在可以对 'empty' 任务列表调用 await 的情况下。

如何等待 Task<T> 列表,然后将新任务添加到等待列表,直到其中一个任务失败或完成。

我相信这个问题一定有 Awaiter or CancellationTokenSource 解决方案。

public class LinkerThingBob
{
    private List<Task> ofmyactions = new List<Task>();

    public void LinkTo<T>(BufferBlock<T> messages) where T : class
    {
        var action = new ActionBlock<IMsg>(_ => this.Tx(messages, _));

        // this would not actually work, because the WhenAny 
        // will not include subsequent actions.
        ofmyactions.Add(action.Completion);

        // link the new action block.
        this._inboundMessageBuffer.LinkTo(block);
    }

    // used to catch exceptions since these blocks typically don't end.
    public async Task CompletionAsync()
    {
        // how do i make the awaiting thread add a new action
        // to the list of waiting tasks without interrupting it
        // or graciously interrupting it to let it know there's one more

        // more importantly, this CompletionAsync might actually be called
        // before the first action is added to the list, so I actually need
        // WhenAny(INFINITE + ofmyactions)
        await Task.WhenAny(ofmyactions);
    }
}

我的问题是我需要一种机制,我可以将上面创建的每个 action 实例添加到一个 Task<T> 中,该实例将在出现异常时完成。

我不确定如何最好地解释这一点,但是:

没有任何内置的东西,但使用 TaskCompletionSource<T> 构建一个并不难。 TCS 是您想要等待某事但尚无构造时使用的类型。 (自定义等待者用于更高级的场景)。

在这种情况下,像这样的东西就足够了:

public class LinkerThingBob
{
  private readonly TaskCompletionSource<object> _tcs = new TaskCompletionSource<object>();

  private async Task ObserveAsync(Task task)
  {
    try
    {
      await task;
      _tcs.TrySetResult(null);
    }
    catch (Exception ex)
    {
      _tcs.TrySetException(ex);
    }
  }

  public void LinkTo<T>(BufferBlock<T> messages) where T : class
  {
    var action = new ActionBlock<IMsg>(_ => this.Tx(messages, _));

    var _ = ObserveAsync(action.Completion);

    this._inboundMessageBuffer.LinkTo(block);
  }

  public Task Completion { get { return _tcs.Task; } }
}

Completion 以未完成状态开始。使用 ObserveAsync 可以链接任意数量的块。一旦其中一个块完成,Completion 也会完成。我在这里写 ObserveAsync 的方式是,如果第一个完成的块没有错误地完成,那么 Completion 也会如此;如果第一个完成的块以异常完成,则 Completion 将以相同的异常完成。随意调整您的特定需求。 :)

这是一个专门使用 TPL 数据流库本身工具的解决方案。您可以创建一个 TransformBlock 来“处理”您想要观察的 ActionBlock。处理一个块意味着简单地等待它的完成。所以 TransformBlock 接受不完整的块,并输出与完成的块相同的块。 TransformBlock 必须配置无限并行和容量,并禁用排序,以便同时观察所有块,并立即返回每个完成的块。

var allBlocks = new TransformBlock<ActionBlock<IMsg>, ActionBlock<IMsg>>(async block =>
{
    try { await block.Completion; }
    catch { }
    return block;
}, new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    EnsureOrdered = false
});

然后在 LinkerThingBob.LinkTo 方法中,将创建的 ActionBlock 发送到 TransformBlock.

var actionBlock = new ActionBlock<IMsg>(_ => this.Tx(messages, _));
allBlocks.Post(actionBlock);

现在你需要一个目标来接收第一个故障块。 WriteOnceBlock 非常适合这个角色,因为它确保最多接收一个故障块。

var firstFaulted = new WriteOnceBlock<ActionBlock<IMsg>>(x => x);

allBlocks.LinkTo(firstFaulted, block => block.Completion.IsFaulted);

终于可以await在任何地方完成WriteOnceBlock了。它会在收到故障块后立即完成,或者如果它从未收到故障块,它可能永远不会完成。

await firstFaulted.Completion;

等待之后,如果需要,您还可以获取故障块。

ActionBlock<IMsg> faultedBlock = firstFaulted.Receive();

WriteOnceBlock在转发消息时的行为方式很特殊。与大多数其他块不同,您可以多次调用它的 Receive 方法,并且您总是会得到它包含的相同的单个项目(在第一个 Receive 之后它不会从缓冲区中删除)。