为什么是 UnobservedTaskException?

Why UnobservedTaskException?

为什么这段代码会导致未处理的异常。我希望根据是否处理 Rx 订阅,在 OnError 处理程序中吞下或报告异常。

除了在每个任务上写一个延续来检查它是否出错然后查看它的异常之外,还有什么好的方法可以避免这种情况属性?

认为 Rx 为我们做了这个,但显然不是。

public static void Main(string[] args)
    {
        TaskScheduler.UnobservedTaskException += (s, a) =>
        {
            Console.WriteLine("***************\nUnhandled '{0}' detected.", a.Exception.InnerException.GetType());
        };

        var d = Observable.FromAsync(() => Task.WhenAny(Do1(), Do2()))
            .Subscribe(x => { }, ex => Console.WriteLine($"Error {ex}"), () => Console.WriteLine("Completed Inner"));

        Console.WriteLine("Press any key to dispose");
        Console.ReadKey();
        d.Dispose();
        GC.Collect();
        GC.WaitForPendingFinalizers();
        Console.ReadKey();
    }

    private static Task Do1()
    {
        return Task.Run(async () =>
        {
            await Task.Delay(500);
            throw new AccessViolationException("Oops 1");
        });
    }

    private static Task Do2()
    {
        return Task.Run(async () =>
        {
            await Task.Delay(600);
            throw new AccessViolationException("Oops 2");
        });
    }

不是未处理;它没有被观察到。

未观察到的异常是由于这段代码:Task.WhenAny(Do1(), Do2()) 与 Rx 无关。 Task.WhenAny 的 return 类型是 Task<Task> - 即,解决已完成任务的未来。为了观察这个内部任务的结果(包括异常),您需要 await 完成的任务,而不是 return 来自 Task.WhenAny 的任务。

所以现在在您的代码中,您最终得到了一个可观察到的 任务 序列,这很奇怪但合法。 FromAsync returns IObservable<Task>,并且 Subscribe lambda 中的 xTask.

类型

我相信您希望 Do1Do2 的结果成为可观察对象中的实际数据,为此,您需要解包该任务。 "double await" 在调用 Task.WhenAny:

时并不少见
var d = Observable.FromAsync(async () => await await Task.WhenAny(Do1(), Do2()))
    .Subscribe(x => { }, ex => Console.WriteLine($"Error {ex}"), () => Console.WriteLine("Completed Inner"));

或者,您可以使用 Unwrap 方法:

var d = Observable.FromAsync(() => Task.WhenAny(Do1(), Do2()).Unwrap())
    .Subscribe(x => { }, ex => Console.WriteLine($"Error {ex}"), () => Console.WriteLine("Completed Inner"));

哪个更适合您的实际代码。

更新: 仅针对特定任务防止未观察到的异常的示例扩展方法:

public static Task IgnoreUnobservedExceptions(this Task task)
{
  Ignore(task);
  return task;
}

public static Task<T> IgnoreUnobservedExceptions<T>(this Task<T> task)
{
  Ignore(task);
  return task;
}

private static async void Ignore(this Task task)
{
  try { await task.ConfigureAwait(false); }
  catch { }
}

用法:Task.WhenAny(Do1().IgnoreUnobservedExceptions(), Do2().IgnoreUnobservedExceptions())

Rx 在出现以下任一情况时,可以很好地冒出错误并自行清理:

  • Observable 被取消(通过处理订阅)
  • 可观察完成。
  • Observable 发生错误。

但要实现这一点,整个计算链需要处于可观察订阅的控制之下。

在你的代码中没有发生。

Observable.FromAsyncFunc<Task<T>> 和 returns 和 IObservable<T>。您的 Observable.FromAsync 正在返回一个 IObservable<Task>,这意味着您的参数正在返回一个 Task<Task>。这就是 Task.WhenAny(Do1(), Do2()) 正在做的事情。这意味着 Task<Task> 的内部任务不受可观察订阅的控制。

要修复代码,您可以做两件事。

首先是修复 TPL,使其处于 observable 的控制之下。这意味着使其可取消并展开 Task.WhenAny.

这是新的 Do 方法:

private static Task Do1(CancellationToken ct)
{
    return Task.Run(async () =>
    {
        await Task.Delay(500, ct);
        throw new AccessViolationException("Oops 1");
    }, ct);
}

private static Task Do2(CancellationToken ct)
{
    return Task.Run(async () =>
    {
        await Task.Delay(600, ct);
        throw new AccessViolationException("Oops 2");
    }, ct);
}

现在你可以这样写你的代码了:

using (
    Observable
        .FromAsync(ct => Task.WhenAny(Do1(ct), Do2(ct)).Unwrap())
        .Subscribe(
            x => { },
            ex => Console.WriteLine($"Error {ex}"),
            () => Console.WriteLine("Completed Inner")))
{

    Console.WriteLine("Press any key to dispose");
    Console.ReadLine();
}

这现在可以正常工作了。

你可以做的第二件事是不要使用任务来编写它。使用纯 Rx 代码。

这是备选方案 Dos:

private static IObservable<Unit> Do1() =>
    from x in Observable.Timer(TimeSpan.FromMilliseconds(500))
    from y in Observable.Throw<AccessViolationException>(new AccessViolationException("Oops 1"))
    select Unit.Default;

private static IObservable<Unit> Do2() =>
    from x in Observable.Timer(TimeSpan.FromMilliseconds(600))
    from y in Observable.Throw<AccessViolationException>(new AccessViolationException("Oops 2"))
    select Unit.Default;    

主要代码如下:

using (
    Observable
        .Amb(Do1(), Do2())
        .Subscribe(
            x => { },
            ex => Console.WriteLine($"Error {ex}"), 
            () => Console.WriteLine("Completed Inner")))
{
    Console.WriteLine("Press any key to dispose");
    Console.ReadLine();
}

这也行。

我更喜欢 Rx 的方式。