在 RxJS 中,Observer 是否被注入到 Observable 执行中?

In RxJS, does Observer get injected into Observable execution?

我已经通读了 ReactiveX 文档好几遍,但仍然无法理解 Observer 订阅 时到底发生了什么可观察.

让我们看一个简单的例子:

import { Observable } from 'rxjs'; 

const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.complete();
});

const observer = {
  next: (x) => console.log('got value ' + x),
  error: (err) => console.error('something wrong occurred: ' + err),
  complete: () => console.log('done')
};

observable.subscribe(observer);

StackBlitz code.


我的问题:

传递给 Observablesubscriber 对象来自哪里?

来自RxJS documentation

It is not a coincidence that observable.subscribe and subscribe in new Observable(function subscribe(subscriber) {...}) have the same name. In the library, they are different, but for practical purposes you can consider them conceptually equal.

所以,显然在 Observable 构造函数(subscriber)中传递给 subscribe 回调的对象是 不是实际上是observer对象。至少如果你按照上面关于图书馆实际工作方式的引述的话就不会。

如果传入的不是observer对象,那么subscriber.next(1)subscribe.complete()到底在调用什么?它如何连接到 observer 中的 next 属性?


澄清编辑:

我知道如何利用 RxJS 并且确实可以 概念上 想象 Observer 被注入(如引用所说)。但是,我来这里是为了了解它 实际上 是如何工作的。

不,没有将观察者注入到可观察对象中。

AFAICT,混淆源于这样一个事实,即 new Observable(...) 语法与其说是一个有用的模式,不如说是一个低级工厂。

它或多或少是更直接的实现所使用的机制,如 of(value1, value2, ..., valueN)from(enumeration)fromEvent(...)

这些方法是您应该关注的实际用例。

在幕后,所有这些方法都将某种同步或异步值或交互 桥接到 可观察流的美妙世界。为此,他们以某种方式 一个合适的观察者:他们生成项目并将它们放入流中。为此,他们使用了一个名为 next 的函数。就像 Observer 实现中的方法一样,bacause 实际上以完全相同的方式被调用。

具体来说,你可以在这里查看订阅方法的实现:

https://github.com/ReactiveX/rxjs/blob/master/src/internal/Observable.ts

如果您想了解订阅期间实际发生了什么,我建议您实际查看代码。 但是,IMO,你应该在熟悉各种 Observable 创建函数之后再尝试。

希望对您有所帮助。

Observable创建流程如下:

作者定义了一个Observable(这里手动加上new,方便说明):

const myObservable = new Observable(function subscribe(subscriber) {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.complete();
  return function tearDownLogic() {
    console.log('runs when Observable for whatever reason is done (complete, error, or unsubscribed)')
  }
});

上面传递给Observablesubscribe回调被Observable constructor保存在本地:

constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic) {
  if (subscribe) {
    this._subscribe = subscribe;
  }
}

因此,我们将由我们或任何其他 pre-made Observable 定义的整个 subscribe 函数保存下来供以后执行。

Observer 可以以多种形式之一传递给 subscribe 回调。或者,直接作为一到三个函数(nexterrorcomplete),或者作为 object 使用相同的三种方法中的一种或多种。为了解释的目的,我们将实现最后一个更详细的选项:

const observer = {
  next(v) {
    console.log(v);
  }
  error(err) {
    console.log(err);
  }
  complete() {
    console.log('Observable has now completed and can no longer emit values to observer');
  }
}

现在,有趣的部分开始了。我们将 observer 传递给 Observable.subscribe(..) 方法:

myObserver.subscribe(observer);

subscribe method 看起来像这样:

  subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void),
            error?: (error: any) => void,
            complete?: () => void): Subscription {


    const { operator } = this;
    const sink = toSubscriber(observerOrNext, error, complete);


    if (operator) {
      sink.add(operator.call(sink, this.source));
    } else {
      sink.add(
        this.source || (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable) ?
        this._subscribe(sink) :
        this._trySubscribe(sink)
      );
    }


    if (config.useDeprecatedSynchronousErrorHandling) {
      if (sink.syncErrorThrowable) {
        sink.syncErrorThrowable = false;
        if (sink.syncErrorThrown) {
          throw sink.syncErrorValue;
        }
      }
    }


    return sink;
  }

简要描述,subscribe方法:

  1. 收到observer之前讨论过的一种形式
  2. toSubscriber 将观察者转换为 Subscriber object,无论其传入形式如何(Subscriber 实例保存在 sink 变量中)
  3. 注意:operator 变量是 undefined,除非您订阅了运算符。因此,只需忽略 operator
  4. 周围的 if 语句
  5. Subscriber 扩展(prototype-linked 到)Subscription object,它的原型有两个重要的方法:unsubscribe()add()
  6. add(..) 用于将“拆卸逻辑”(函数)添加到 Observable,当运行 Observable 完成取消订阅。它将接受传递给它的任何函数,将其包装在 Subscription object 中,并将该函数放入 Subscription_unsubscribe 变量中。这个 Subscription 保存在我们上面创建的 Subscriber 上,在一个名为 _subscriptions 的变量中。如前所述,我们这样做是为了在 Subscriber 取消订阅 完成 时,所有 add() 拆解逻辑执行
  7. 作为旁注,Observable.subscribe() returns Subscriber 实例。因此,您可以随时对其调用 mySubscriber.add( // some tear down logic) 以添加将在 Observable 完成 取消订阅 [=118] 时执行的函数=]
  8. 现在包含一个重要部分:this._trySubscribe(sink) 运行s(在 add() 内,作为参数)。 _trySubscribe(..) 是 运行 先前由 Observable 构造函数保存的 subscribe 回调函数。重要的是,它传入 sink(我们新的 Subscriber 实例)作为 Observable 回调的回调。也就是说,当Observable里面的subscriber.next(1)执行的时候,我们实际上是在sink(Subscriber)实例中执行next(1)(next()是在 Subscriber 的原型上)。

现在,我到此结束了。在 toSubscribe 内部和 取消订阅 过程中还有更多详细信息,但这些不在本问答的范围内。

简而言之,为了回答标题中的问题,Observer 确实被传递到 Observable,只是在转换为统一的 [=28] 之后=] object.

希望这对以后的其他人有所帮助。