Subject.OnError 投掷

Subject.OnError throwing

我正在使用 Rx .NET 的 Subject.OnError,它似乎是抛出异常而不是传播异常。我的场景是 Subject 在一个单独的线程上提供数据,调用线程需要在返回数据时做一些事情,还要等待来自 observable 的所有数据的完成,以及向上传播发生的任何异常。

这是一个简化的例子:

class Program
{
    static async Task Main(string[] args)
    {
        var subject = new Subject<bool>();

        Task.Run(async () =>
        {
            await Task.Delay(5000);
            subject.OnError(new Exception()); //This call is throwing!
        });

        subject.Subscribe(e =>
        {
            //Do some data processing here
        });

        try
        {
            //Need to wait for observable to complete before returning to the caller
            await subject.LastOrDefaultAsync();
        }
        catch
        {
            //Do some logging, clean up resources
            throw;
        }
    }
}

如果我删除对 subject.Subscribe() 的调用,代码将按您预期的方式工作,并且在 subject.LastOrDefaultAsync() 处重新抛出异常。然而,随着 Subscribe 的出现,对 subject.OnError() 的调用立即重新抛出异常(不将其传递给可观察对象),这对我来说似乎很奇怪。

如何解决这个问题?

(仅供参考,大量代码已经使用 Subject 编写,因此建议我根本不使用它不是一个可接受的解决方案)

这是一个更简单的例子:

void Main()
{
    var subject = new Subject<bool>();
    subject.Subscribe(b => {/* bool handling code */});
    subject.OnError(new Exception()); //This call is throwing!
}

Subscribe 重载重新抛出它收到的异常。如果您想忽略异常,请执行以下操作:

void Main()
{
    var subject = new Subject<bool>();
    // subject.Subscribe();
    subject.Subscribe(b => {/* bool handling code */}, e => { });
    subject.OnError(new Exception()); //This call is throwing!

}

如果您想查看此来源,请查看此处:https://github.com/dotnet/reactive/blob/master/Rx.NET/Source/src/System.Reactive/Observable.Extensions.cs(第 63 行)。它抛出任何捕获的异常。


编辑

如果您想深入研究这个问题,这里(实际上)是异常处理代码,它最终被 .Subscribe(onNextHandlerOnly) 重载调用:

void Main()
{
    var subject = new Subject<bool>();
    subject.Subscribe(b => b.Dump(), e => { e.Throw(); }, () => {});
    subject.OnError(new Exception()); //This call is throwing!

}

public static class X
{
    public static void Throw(this Exception exception)
    {
         System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture(exception).Throw();
    }
}

EDI.Capture 调用使异常的 'source' 看起来像是 OnError 调用,而不是 Subscribe