异步调用事件并捕获异常而不停止来自 运行 的其他任务

Invoke event asynchronously and catching exceptions without stopping other tasks from running

给定以下代码:

public delegate Task AsyncEventHandler<in TEventArgs>(object sender, TEventArgs eventArgs);

public static Task InvokeAsync(this AsyncEventHandler eventHandler, object sender, EventArgs eventArgs)
{
    if (eventHandler == null) return Task.CompletedTask;

    var delegates = eventHandler.GetInvocationList().Cast<AsyncEventHandler>();
    var tasks = delegates.Select(it => it.Invoke(sender, eventArgs));

    return Task.WhenAll(tasks);
}

我有一个测试函数,故障处理程序应该抛出异常,workinghanlder 应该 运行 - 目前只有第一个 FaultyHandler1 被调用,没有其他事件处理程序。

private class NonGenericNotifier
{
    public event AsyncEventHandler SomethingHappened;
    public Task OnSomethingHappening() => SomethingHappened.InvokeAsync(this, EventArgs.Empty);
}

public async Task Multiple_Exceptions_That_Occur_During_Event_Handling_Should_Be_Propagated()
{
    var isHandler1Called = false;
    var isHandler2Called = false;
    var isWorkingHandlerCalled = false;

    var notifier = new NonGenericNotifier();

    Task FaultyHandler1(object sender, EventArgs eventArgs)
    {
        isHandler1Called = true;
        throw new InvalidOperationException();
    }

    Task FaultyHandler2(object sender, EventArgs eventArgs)
    {
        isHandler2Called = true;
        throw new InvalidOperationException();
    }

    Task WorkingHandler(object sender, EventArgs eventArgs)
    {
        isWorkingHandlerCalled = true;
        return Task.CompletedTask;
    }

    notifier.SomethingHappened += FaultyHandler1;
    notifier.SomethingHappened += FaultyHandler2;
    notifier.SomethingHappened += WorkingHandler;

    await Should.ThrowAsync<InvalidOperationException>(async () => await notifier.OnSomethingHappening());
    isHandler1Called.ShouldBe(true);
    isHandler2Called.ShouldBe(true);
    isWorkingHandlerCalled.ShouldBe(true);
}

假设可以抛出一个异常,我相信这应该是一个 AggregateException 包含每个任务的异常,最重要的是上面的 InvokeAsync 方法应该在遇到第一个异常时退出。

我已经开始在 InvokeAsync 扩展方法中创建一个 List<Exception>,并用 try/catch 构造包装每个 it => it.Invoke(sender, eventArgs),并在 catch 中添加异常到例外列表。

但是我不知道如何整理这个例外列表,然后作为 AggregateException.

发送

更新(修复?)

感谢 Artur 为我指明了正确的方向。我将 InvokeAsync 扩展方法更改为以下,它 有效 - 不再停止第一个任务。我们使用 code here:

var tasks = delegates.Select(it => it.Invoke(sender, eventArgs)); 到下面
public static Task InvokeAsync(this AsyncEventHandler eventHandler, object sender, EventArgs eventArgs)
{
    if (eventHandler == null) return Task.CompletedTask;

    var delegates = eventHandler.GetInvocationList().Cast<AsyncEventHandler>();
    var tasks = delegates.Select(async it => await it.Invoke(sender, eventArgs));

    return Task.WhenAll(tasks).WithAggregatedExceptions();
}

static Task WithAggregatedExceptions(this Task task)
{
    // using AggregateException.Flatten as a bonus
    return task.ContinueWith(
        continuationFunction: priorTask =>
            priorTask.IsFaulted &&
            priorTask.Exception is AggregateException ex && (ex.InnerExceptions.Count > 1 || ex.InnerException is AggregateException) ? Task.FromException(ex.Flatten()) : priorTask,
        cancellationToken: CancellationToken.None,
        TaskContinuationOptions.ExecuteSynchronously,
        scheduler: TaskScheduler.Default).Unwrap();
}

我的问题是此事件的订阅者正在编写我无法控制的同步处理程序 - 这会停止附加到同一事件的其他事件处理程序(同步和异步)运行ning。

我也很欣赏这是 Task.WhenAll 的设计功能,如果您混合使用异步和非异步处理程序...如果有一个理由不在没有 [=24 的情况下在异步函数中编写同步代码=]就是这样。

问题

我们可以说用 async/await 包装 delegates.Select(async it => await it.Invoke(sender, eventArgs) 允许同步方法 运行,并且在 最差(?) 包装两次异步方法(与嵌套 async/await 函数调用相同)所以实际上不是问题?

是否引入了任何副作用?

随着赏金寻找有关如何实施的权威指导,一个答案(非常感谢为讨论做出贡献)说要避免异步事件,但在其他地方,如 discord c# 客户端,他们已经接受了异步事件(带有超时包装器等)。

在我看来,以异步方式使用 C# 事件的设计并不健壮,而且它总是会以稍微不受控制的方式运行。有更好的技术可以使事件处理更健壮。

最好的此类技术之一是 TPL 数据流 (https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library)。这个库允许您以非常可控的方式编写流处理程序,它可以帮助您处理任务调度程序等。一旦您成功应用它,您问题中的所有问题都将得到解决。

显然还有其他选择,但我会明确放弃 re-implementing 使用 C# 事件....

Task.WhenAll 将在具体化其参数时调用所有处理程序。它将一次调用一个,然后异步等待所有任务完成。

您看到第一个异常停止的原因是因为异常是在具体化期间抛出的。异步 (Task-returning) 函数在 returned 任务上放置任何异常是正常的。异步函数直接抛出异常是不正常的

所以,这是有问题的代码:

Task FaultyHandler1(object sender, EventArgs eventArgs)
{
  isHandler1Called = true;
  throw new InvalidOperationException();
}

其中一个是正确的:

async Task FaultyHandler1(object sender, EventArgs eventArgs)
{
  isHandler1Called = true;
  throw new InvalidOperationException();
}

Task FaultyHandler1(object sender, EventArgs eventArgs)
{
  isHandler1Called = true;
  return Task.FromException(new InvalidOperationException());
}

您看到奇怪的行为是因为异步处理程序行为不当(通过抛出同步异常)。

现在,如果您想要允许行为不当的异步处理程序,您可以做到这一点,或者使用明确的try /catch 或额外的 async/await:

var tasks = delegates.Select(it => try { return it.Invoke(sender, eventArgs); } catch (Exception ex) { return Task.FromException(ex); });

// async/await is necessary to handle misbehaving asynchronous handlers that throw synchronous exceptions
var tasks = delegates.Select(async it => await it.Invoke(sender, eventArgs));

如果您确实保留 async/await 方法,请对其进行评论,因为这样的编码结构通常被认为是虚假的,可能会被未来的维护者删除。

WithAggregatedExceptions看起来不错as-is,但如果你愿意,可以简化它:

static async Task WithAggregatedExceptions(this Task task)
{
  try { await task; }
  catch { throw task.Exception; }
}

My issue is subscribers of this event, writing synchronous handlers over which I have no control - this would stop the other event handlers (sync and async) running that are attached to the same event.

嗯,是的。 Task.WhenAll 同步具体化其任务集合,一次调用所有处理程序。

如果您想同时允许同步处理程序和异步处理程序,您可以将调用包装在Task.Run:

var tasks = delegates.Select(it => Task.Run(() => it.Invoke(sender, eventArgs)));

Can we say that wrapping the delegates.Select(async it => await it.Invoke(sender, eventArgs) with async/await allows synchronous method to run, and at worst(?) wrap twice an async method (which is the same as nesting async/await function calls) so is actually a non-issue?

异步处理程序的额外 async/await 是 non-issue;它的效率非常低,而且 似乎 没有必要,所以我想说它有被删除的危险(除非评论)。它不“允许 运行 的同步方法”;相反,它纠正了直接抛出异常的行为不当的方法,而不是按预期将它们放在 returned Task 上。

Are there any side effects that have been introduced?

不是真的。如果您确实使用 Task.Run 方法,那么所有处理程序都会在线程池线程上调用,并且可能 运行 并发,这可能会令人惊讶。

one answer (much appreciated for contributing to the discussion) says to avoid async events, yet in other places like the discord c# client they have embraced async events (with timeout wrappers etc).

我 100% 同意这个答案。

以下是我的看法:

观察者模式是一种通知观察者状态变化的方式。观察者模式非常适合 OOP 中的“事件”:任意数量的观察者都可以订阅状态更改通知。这就是 C# 事件的模式:通知订阅者事情。没有信息“回流”的机制。虽然语言 允许 具有 return 值的 C# 事件,但无论如何它都不是 natural。同样的限制发生在异常(可以被认为是一种 return):标准 handler?.Invoke 模式开始中断(在第一个异常时停止调用等)。

一旦你有信息“回流”(包括需要处理异常,或者需要 await 所有处理程序完成),你就不再处于观察者模式中,你是不再处于 C# 事件的快乐路径中。

一般来说,我发现这种类型的大多数“事件”通常与策略或访问者模式相关,而不是与观察者相关。 Strategy 和 Visitor 都不适合 C# 事件,尽管它们通常(遗憾地)以这种方式实现。我认为这是一个常见的设计错误(对于所有 OOP 语言)。