IObservable.Subscribe(IObserver<T> observer) 重载是为了什么?

What is the IObservable.Subscribe(IObserver<T> observer) overload for?

当我写 .Subscribe 表达式时,我经常发现 Resharper 为我选择了以下重载,位于 mscorlib,Version=4.0.0.0:

namespace System
{
  public interface IObservable<out T>
  {
    IDisposable Subscribe(IObserver<T> observer);
  }
}

这似乎与大多数采用 Action 的重载非常不同,而且它来自 mscorlib 而不是 System.Reactive.*,这是我期望的大部分 Reactive 内容。

这个重载有什么作用? 应该如何使用? IObserver<T>Action 有什么关系? 为什么在 mscorlib 中会出现这种单一重载?

如果您查看 IObserver,您可能会明白原因。该接口包含三个方法(或 Actions),其中最多 "used" 个是 OnNext.

现在让我们看看 Action here 的重载的实现。扩展方法实际上通过将提供的 Action 作为 OnNext 语义为您生成 IObserver

如果您希望处理 OnErrorOnCompleted 通知,您可以为所有接口方法实现提供 Action

需要说明的是,这不是重载,它确实是 Rx 的核心。所有其他 Subscribe 方法,以及与此相关的所有其他运算符,您习惯的实际上是最终调用它的扩展方法。

如果您查看早期文档或 Rx,您会发现创建者将其视为 LINQ 的基于推送的一面。所以很多东西都是你在 LINQ 中看到的东西的镜像。 IObservableIEnumerable的镜像,IObserverIEnumerator的镜像。

但是,因为推与拉相反,所以 Rx 版本与其基于拉的版本相反:

  • IEnumerable 定义了一种生成 IEnumerator 的方法。 IObservable 定义了一种方法 接受 一个 IObserver.
  • 如果您将 IEnumerator.MoveNext() + IEnumerator.Current 视为一个操作,则可以 return 三种方式之一:下一个元素 returned,到达集合末尾, 或抛出异常。同样,IObserver 必须处理三种情况:下一个元素 (OnNext)、流结束 (OnCompleted) 或异常 (OnError)。

Subscribe 中更熟悉的 'overloads' 实际上只是扩展方法,看起来像这样:

public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext)
{
    return Subscribe(onNext, e => {/*onError */}, () => {/*onCompleted*/);
}

public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError, Action onCompleted)
{
    source.Subscribe(new AnonymousObserver<T>(onNext, onError, onCompleted));
}