如何实现一个 "better" Finally Rx 运算符?

How to implement a "better" Finally Rx operator?

最近我意识到 Rx Finally 运算符的行为方式至少对我来说是出乎意料的。我的期望是 finallyAction 抛出的任何错误都会传播到下游的运算符观察者。 las,这不是发生的事情。实际上,运算符 first 将先行序列的完成(或失败)传播给它的观察者,而 then 调用 action,在无法传播操作引发的潜在错误的时间点。所以它在 ThreadPool 上抛出错误并使进程崩溃。这不仅出乎意料,而且问题重重。以下是此行为的最小演示:

Observable
    .Timer(TimeSpan.FromMilliseconds(100))
    .Finally(() => throw new ApplicationException("Oops!"))
    .Subscribe(_ => { }, ex => Console.WriteLine(ex.Message),
        () => Console.WriteLine("Completed"));

Thread.Sleep(1000);

结果:未处理的异常 (Fiddle)

Finally lambda 抛出的异常未由 Subscribe:onError 处理程序处理,因为它是可取的。

此功能(我很想称它为缺陷)在我看来严重限制了 Finally 运算符的实用性。基本上,我只能在我想调用一个预期永远不会失败的操作时使用它,如果它失败,则表明应用程序状态发生灾难性损坏,此时无法恢复。例如,我可以将它用于 Release a SemaphoreSlim(就像我所做的那样 for example), which can only fail if my code has a bug. I am OK with my app crashing in this case. But I've also used it 调用调用者提供的未知操作,该操作可能会失败,并使应用程序崩溃在这种情况下是不可接受的。相反,错误应该传播到下游。所以我在这里问的是如何实现具有相同签名和指定行为的 Finally 变体(我们称之为 FinallySafe)以下:

public static IObservable<TSource> FinallySafe<TSource>(
    this IObservable<TSource> source, Action finallyAction);
  1. finallyAction 应该在 source 序列发出 OnCompletedOnError 通知后调用,但是 此通知传播到观察者之前。
  2. 如果 finallyAction 调用成功完成,应将原始 OnCompleted/OnError 通知传播给观察者。
  3. 如果 finallyAction 调用失败,应将 OnError 通知传播给观察者,其中包含刚刚发生的错误。在这种情况下,应该忽略(不传播)先前的错误,即可能导致 source 完成失败的错误。
  4. FinallySafesource 完成之前取消订阅时,也应调用 finallyAction。当订阅者(观察者)处理订阅时,finallyAction 应该被同步调用,任何错误都应该传播给 Dispose 方法的调用者。
  5. 如果 FinallySafe 被多个观察者订阅,则 finallyAction 应该在每个订阅中调用一次,每个订阅者独立调用,遵循上述规则。并发调用是可以的。
  6. finallyAction 对每个订阅者的调用不应超过一次。

验证:将上面代码片段中的 Finally 替换为 FinallySafe,应该会导致程序不会因未处理的异常而崩溃。

替代方案:我也愿意接受一个答案,该答案提供了关于为什么内置 Finally 运算符的行为比行为更好的合理解释自定义 FinallySafe 运算符,如上所述。

Finally 在序列结束后被调用,并且由于 Rx 合约只允许一个 OnErrorOnCompleted 它不能发出第二个。

但是,如果将 Finally 替换为 Do,您可以获得所需的行为。

试试这个代码:

Observable
    .Timer(TimeSpan.FromMilliseconds(100))
    .Do(_ => { }, () => throw new ApplicationException("Oops!"))
    .Subscribe
        (_ => { },
        ex => Console.WriteLine(ex.Message),
        () => Console.WriteLine("Completed"));

Thread.Sleep(TimeSpan.FromMilliseconds(1000));

如您所愿。

我得到这个输出:

Oops!

如果您想 运行 取消订阅,请使用此扩展方法:

public static class Ext
{
    public static IObservable<T> Unsubscribed<T>(this IObservable<T> source, Action unsubscribed) =>
        Observable.Create<T>(o =>
            new CompositeDisposable(source.Subscribe(o), Disposable.Create(unsubscribed)));
}

这是一个使用示例:

var source = Observable.Never<int>();

var subscription =
    source
        .Unsubscribed(() => Console.WriteLine("Unsubscribed"))
        .Subscribe();

subscription.Dispose();

输出:

Unsubscribed

我阅读了文档,现在我确定了。 finally-运算符将在完成后调用,不应抛出任何异常。

与非响应式编程相比:

StreamReader file = new StreamReader("file.txt");
string ln;  

try {  
   while ((ln = file.ReadLine()) != null) {  
      Console.WriteLine(ln);
   }
}
finally {
   // avoid to throw an exception inside of finally!
   if (file != null) {
      file.close();
   }
}

重要的是不要在 finally.

中抛出异常

下面是如何正确使用它的示例 (fiddle):

using System;
using System.Reactive.Linq;
using System.Threading;

public class Program
{
    public static void Main()
    {
        Observable
            .Range(1,5) // simulates stream-reader
            .Finally(() => Console.WriteLine("Close streamreader"))
            .Do(i => {
                if (i == 5) {
                    throw new ApplicationException("Oops!"); // simulates IO-error
                }
                
                Console.WriteLine("Read " + i);
            })
            .Subscribe(_ => { }, ex => Console.WriteLine(ex.Message),
                () => Console.WriteLine("Completed"));

        Thread.Sleep(1000);
    }
}

我不确定您要做什么(而且我对 c# reactive 还很陌生),但我认为您使用的运算符不正确。

编辑

但如果需要,您可以对其进行修补。在这篇文章中,他们做了一些熟悉的事情。
http://introtorx.com/Content/v1.0.10621.0/11_AdvancedErrorHandling.html

这是 FinallySafe 运算符的实现,具有问题中指定的行为:

/// <summary>
/// Invokes a specified action after the source observable sequence terminates
/// successfully or exceptionally. The action is invoked before the propagation
/// of the source's completion, and any exception thrown by the action is
/// propagated to the observer. The action is also invoked if the observer
/// is unsubscribed before the termination of the source sequence.
/// </summary>
public static IObservable<T> FinallySafe<T>(this IObservable<T> source,
    Action finallyAction)
{
    return Observable.Create<T>(observer =>
    {
        var finallyOnce = Disposable.Create(finallyAction);
        var subscription = source.Subscribe(observer.OnNext, error =>
        {
            try { finallyOnce.Dispose(); }
            catch (Exception ex) { observer.OnError(ex); return; }
            observer.OnError(error);
        }, () =>
        {
            try { finallyOnce.Dispose(); }
            catch (Exception ex) { observer.OnError(ex); return; }
            observer.OnCompleted();
        });
        return new CompositeDisposable(subscription, finallyOnce);
    });
}

finallyAction被分配为Disposable.Create一次性实例的Dispose动作,以确保该动作最多被调用一次。然后通过使用 CompositeDisposable 实例将此一次性订阅与 source 的一次性订阅相结合。

作为旁注,我想解决这个问题,我们是否可以更进一步,并在取消订阅期间向下游传播 finallyAction 的可能错误。在某些情况下这可能是可取的,但不幸的是这是不可能的。首先也是最重要的是,这样做会违反 The Observable Contract 文档中的准则,该准则指出:

When an observer issues an Unsubscribe notification to an Observable, the Observable will attempt to stop issuing notifications to the observer. It is not guaranteed, however, that the Observable will issue no notifications to the observer after an observer issues it an Unsubscribe notification.

所以这样的实现是不符合规范的。更糟糕的是,Observable.Create 方法通过在处理订阅后立即使 observer 静音来强制执行此准则。它通过将观察者封装在 AutoDetachObserver 包装器中来实现。即使我们试图通过从头开始实现 IObservable<T> 类型来规避此限制,任何可以附加在我们不符合要求的 Finally 运算符之后的内置运算符都会使我们的 post-取消订阅 OnError 通知。所以这是不可能的。取消订阅期间的错误无法传播到刚刚请求取消订阅的订阅者。