发布可观察对象时异步创建挂起

Async Create hanging while publishing observable

考虑以下代码:

var xs = Observable.Create<Unit>(async o => 
{
    await Task.Delay(10);
    o.OnError(new Exception());
}).Replay().RefCount();

xs.Subscribe(x => Console.WriteLine(x));
xs.Subscribe(x => Console.WriteLine(x), ex => Console.WriteLine(ex.Message));
await xs.DefaultIfEmpty();

上面的序列不会抛出任何异常并且永远不会完成。

我做了以下观察:

  1. 删除第一个订阅会启用错误传播 - 在 Subscribe 上下文(最后一行)中抛出异常
  2. 删除 .Replay().RefCount() 启用错误传播 - 在 Subscribe 上下文(最后一行)
  3. 中抛出异常
  4. 删除 await Task.Delay(10) 启用错误传播 - 在 OnError 调用中抛出异常(在 Create 方法中)。令人惊讶的是,切换两个 Subscribe 方法会在 Subscribe 上下文(最后一行)抛出异常。

话虽如此,我想问的是以下问题是否是设计使然:

  1. 上述场景中的可观察序列从未完成
  2. 有时在 Create 方法中抛出异常,有时在 Subscribe 上下文中抛出异常。

如果这是设计使然,您会推荐什么解决方法?在这种情况下,如何发布我的序列以便我的所有客户(观察者)都可以安全地处理异常?当前的行为似乎非常随意,尤其是对于库创建者而言。这也让调试变得非常痛苦。请指教

我认为这可能与管道拆卸的动态有关。

这是我们对 Replay(内部使用 ReplaySubject)的预期行为。

但是,错误通知是否在管道拆除之前传播似乎是时间问题。直接使用 ReplaySubject 时,它会按预期工作,即使在 Subscribe.

中使用 OnError
        var xs = new ReplaySubject<Unit>();
        var sxs = Observable.Create<Unit>(async o =>
        {
            await Task.Delay(10);
            o.OnError(new Exception("ERR"));

        }).Subscribe(xs);


        xs.Subscribe(x => Console.WriteLine(x), ex => Console.WriteLine(ex.Message));
        xs.Subscribe(x => Console.WriteLine(x), ex => Console.WriteLine(ex.Message));

        xs.DefaultIfEmpty().Wait();

        Console.WriteLine("-end-");
        Console.ReadLine();

使用 IConnectableObservable 似乎会在拆卸过程中导致 Heisenbugs。

第一个回答你的问题:

  1. 不,这不是设计使然。这是一个错误。
  2. "Workaround" 就是不要混用 TPL 和 Reactive。你可以打出这样有趣的东西。

以下按预期工作:

var xs = Observable.Throw<Unit>(new Exception())
    .Delay(TimeSpan.FromMilliseconds(10))
    .Replay()
    .RefCount();

这会导致在第一个 .Subscribeawait xs.DefaultIfEmpty() 调用时引发异常。由于延迟,您得到两个异常:多个线程 运行。


至于为什么会这样,这是一个开始:

第一个 Subscribe 代码基本上转换为以下代码。 (See source):

xs.Subscribe(x => Console.WriteLine(x), Stubs.Throw, Stubs.Nop);

public static class Stubs
{

    public static readonly Action Nop = delegate
    {
    };

    public static readonly Action<Exception> Throw = delegate (Exception ex)
    {
        var edi = ExceptionDispatchInfo.Capture(ex);
        edi.Throw();
    };
}

如果您在 Stubs class 中设置断点,您会看到它进入那里并尝试抛出异常。然而,异常并没有冒出来,很可能是由于一些奇怪的 TPL/ReplaySubject 交互。

当前版本的 System.Reactive 库 (5.0.0) 仍然存在该问题。我在确定其原因方面取得了一些进展。该问题与忽略 onError 处理程序的顽皮订阅者有关。当省略此处理程序时,Rx 会在其位置放置如下内容:

void OnError(Exception error) => throw error;

错误只是在调用处理程序的线程上同步重新抛出!这意味着在顽皮的订阅者之后没有其他订阅者会收到 OnError 通知,因为 observable 会顺序和同步地通知其所有订阅者。

var subject = new Subject<Unit>();
subject.Subscribe(); // Naughty subscriber
subject.OnError(new ApplicationException("@")); // Throws synchronously

(Try it on Fiddle)

通常这不是问题,因为大多数来自内置 Rx 运算符的通知都是从 ThreadPool 线程异步调用的。这意味着客户端代码无法捕获由顽皮订阅者引起的异常。因此,异常仍未得到处理,并导致进程崩溃。对于商业应用程序来说,这不是一个理想的行为,但它肯定比挂起的过程要好得多,等待一个永远不会到来的通知。我这样说是为了说明我们在这里试图解决的问题是如何使应用程序持续崩溃。当发生错误并且我们有一个顽皮的订阅者时,使进程崩溃是我们期望的行为。因此,让我们也找到一种方法来实现 Observable.Create

本机 Observable.Create(Func<IObserver<TResult>, Task>) 方法的问题是调用异步 lambda 会产生 Task,并且任务具有嵌入式错误处理功能。因此,lambda 内部的异常不会自动升级为导致进程崩溃的未处理异常。我修复 Observable.Create 方法的第一次尝试是 await 任务,捕获异常,尝试通过 observer.OnError 方法再次传播它,如果此尝试失败,则重新抛出异常ThreadPool 并使进程永远崩溃。但是这个想法是有缺陷的,因为只有 observer.OnError 的第一次调用会抛出。随后的调用什么都不做,因为内置的观察者尊重 Rx 契约,它不允许多个 OnError 通知。所以 try/catch 必须专门针对 observer.OnError 调用,而不是一般的整个异步 lambda。这意味着我们需要提供的观察者的包装器。下面的实现是基于这个想法:

public static IObservable<TSource> CreateObservableEx<TSource>(
    Func<IObserver<TSource>, CancellationToken, Task> subscribeAsync)
{
    return Observable.Create<TSource>(async (observer, cancellationToken) =>
    {
        var innerObserver = Observer.Create<TSource>(observer.OnNext,
            PropagateError, observer.OnCompleted);

        try { await subscribeAsync(innerObserver, cancellationToken); }
        catch (Exception ex)
        {
            PropagateError(ex);
        }

        void PropagateError(Exception error)
        {
            try { observer.OnError(error); }
            catch (Exception ex)
            {
                ThreadPool.QueueUserWorkItem(_ => ExceptionDispatchInfo.Throw(ex));
            }
        }
    });
}

用法示例:

var xs = CreateObservableEx<Unit>(async (o, ct) =>
{
    await Task.Delay(10, ct);
    o.OnError(new Exception());
}).Replay().RefCount();

xs.Subscribe(x => Console.WriteLine(x));
xs.Subscribe(x => Console.WriteLine(x), ex => Console.WriteLine(ex.Message));
await xs.DefaultIfEmpty();

(Try it on Fiddle)

输出:

System.Exception (Unhandled)