在 RxJS 中,Observer 是否被注入到 Observable 执行中?
In RxJS, does Observer get injected into Observable execution?
我已经通读了 ReactiveX 文档好几遍,但仍然无法理解 Observer 订阅 时到底发生了什么可观察.
让我们看一个简单的例子:
import { Observable } from 'rxjs';
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.complete();
});
const observer = {
next: (x) => console.log('got value ' + x),
error: (err) => console.error('something wrong occurred: ' + err),
complete: () => console.log('done')
};
observable.subscribe(observer);
我的问题:
传递给 Observable 的 subscriber
对象来自哪里?
It is not a coincidence that observable.subscribe
and subscribe
in
new Observable(function subscribe(subscriber) {...})
have the same name.
In the library, they are different, but for practical purposes you can
consider them conceptually equal.
所以,显然在 Observable 构造函数(subscriber
)中传递给 subscribe 回调的对象是 不是实际上是observer
对象。至少如果你按照上面关于图书馆实际工作方式的引述的话就不会。
如果传入的不是observer
对象,那么subscriber.next(1)
和subscribe.complete()
到底在调用什么?它如何连接到 observer
中的 next
属性?
澄清编辑:
我知道如何利用 RxJS 并且确实可以 概念上 想象 Observer 被注入(如引用所说)。但是,我来这里是为了了解它 实际上 是如何工作的。
不,没有将观察者注入到可观察对象中。
AFAICT,混淆源于这样一个事实,即 new Observable(...)
语法与其说是一个有用的模式,不如说是一个低级工厂。
它或多或少是更直接的实现所使用的机制,如 of(value1, value2, ..., valueN)
、from(enumeration)
和 fromEvent(...)
。
这些方法是您应该关注的实际用例。
在幕后,所有这些方法都将某种同步或异步值或交互 桥接到 可观察流的美妙世界。为此,他们以某种方式 像 一个合适的观察者:他们生成项目并将它们放入流中。为此,他们使用了一个名为 next
的函数。就像 Observer
实现中的方法一样,bacause 实际上以完全相同的方式被调用。
具体来说,你可以在这里查看订阅方法的实现:
https://github.com/ReactiveX/rxjs/blob/master/src/internal/Observable.ts
如果您想了解订阅期间实际发生了什么,我建议您实际查看代码。
但是,IMO,你应该在熟悉各种 Observable 创建函数之后再尝试。
希望对您有所帮助。
Observable
创建流程如下:
作者定义了一个Observable
(这里手动加上new
,方便说明):
const myObservable = new Observable(function subscribe(subscriber) {
subscriber.next(1);
subscriber.next(2);
subscriber.complete();
return function tearDownLogic() {
console.log('runs when Observable for whatever reason is done (complete, error, or unsubscribed)')
}
});
上面传递给Observable
的subscribe
回调被Observable constructor保存在本地:
constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic) {
if (subscribe) {
this._subscribe = subscribe;
}
}
因此,我们将由我们或任何其他 pre-made Observable
定义的整个 subscribe
函数保存下来供以后执行。
Observer 可以以多种形式之一传递给 subscribe
回调。或者,直接作为一到三个函数(next、error、complete),或者作为 object 使用相同的三种方法中的一种或多种。为了解释的目的,我们将实现最后一个更详细的选项:
const observer = {
next(v) {
console.log(v);
}
error(err) {
console.log(err);
}
complete() {
console.log('Observable has now completed and can no longer emit values to observer');
}
}
现在,有趣的部分开始了。我们将 observer
传递给 Observable.subscribe(..)
方法:
myObserver.subscribe(observer);
subscribe method 看起来像这样:
subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void),
error?: (error: any) => void,
complete?: () => void): Subscription {
const { operator } = this;
const sink = toSubscriber(observerOrNext, error, complete);
if (operator) {
sink.add(operator.call(sink, this.source));
} else {
sink.add(
this.source || (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable) ?
this._subscribe(sink) :
this._trySubscribe(sink)
);
}
if (config.useDeprecatedSynchronousErrorHandling) {
if (sink.syncErrorThrowable) {
sink.syncErrorThrowable = false;
if (sink.syncErrorThrown) {
throw sink.syncErrorValue;
}
}
}
return sink;
}
简要描述,subscribe
方法:
- 收到
observer
之前讨论过的一种形式
toSubscriber
将观察者转换为 Subscriber
object,无论其传入形式如何(Subscriber
实例保存在 sink
变量中)
- 注意:
operator
变量是 undefined
,除非您订阅了运算符。因此,只需忽略 operator
周围的 if
语句
Subscriber
扩展(prototype-linked 到)Subscription
object,它的原型有两个重要的方法:unsubscribe()
、add()
add(..)
用于将“拆卸逻辑”(函数)添加到 Observable
,当运行 Observable
完成 或 取消订阅。它将接受传递给它的任何函数,将其包装在 Subscription
object 中,并将该函数放入 Subscription
的 _unsubscribe
变量中。这个 Subscription
保存在我们上面创建的 Subscriber
上,在一个名为 _subscriptions
的变量中。如前所述,我们这样做是为了在 Subscriber
被 取消订阅 或 完成 时,所有 add()
拆解逻辑执行
- 作为旁注,
Observable.subscribe()
returns Subscriber
实例。因此,您可以随时对其调用 mySubscriber.add( // some tear down logic)
以添加将在 Observable
完成 或 取消订阅 [=118] 时执行的函数=]
- 现在包含一个重要部分:
this._trySubscribe(sink)
运行s(在 add()
内,作为参数)。 _trySubscribe(..)
是 运行 先前由 Observable
构造函数保存的 subscribe
回调函数。重要的是,它传入 sink
(我们新的 Subscriber
实例)作为 Observable
回调的回调。也就是说,当Observable
里面的subscriber.next(1)
执行的时候,我们实际上是在sink
(Subscriber
)实例中执行next(1)
(next()
是在 Subscriber
的原型上)。
现在,我到此结束了。在 toSubscribe
内部和 取消订阅 过程中还有更多详细信息,但这些不在本问答的范围内。
简而言之,为了回答标题中的问题,Observer 确实被传递到 Observable
,只是在转换为统一的 [=28] 之后=] object.
希望这对以后的其他人有所帮助。
我已经通读了 ReactiveX 文档好几遍,但仍然无法理解 Observer 订阅 时到底发生了什么可观察.
让我们看一个简单的例子:
import { Observable } from 'rxjs';
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.complete();
});
const observer = {
next: (x) => console.log('got value ' + x),
error: (err) => console.error('something wrong occurred: ' + err),
complete: () => console.log('done')
};
observable.subscribe(observer);
我的问题:
传递给 Observable 的 subscriber
对象来自哪里?
It is not a coincidence that
observable.subscribe
andsubscribe
innew Observable(function subscribe(subscriber) {...})
have the same name. In the library, they are different, but for practical purposes you can consider them conceptually equal.
所以,显然在 Observable 构造函数(subscriber
)中传递给 subscribe 回调的对象是 不是实际上是observer
对象。至少如果你按照上面关于图书馆实际工作方式的引述的话就不会。
如果传入的不是observer
对象,那么subscriber.next(1)
和subscribe.complete()
到底在调用什么?它如何连接到 observer
中的 next
属性?
澄清编辑:
我知道如何利用 RxJS 并且确实可以 概念上 想象 Observer 被注入(如引用所说)。但是,我来这里是为了了解它 实际上 是如何工作的。
不,没有将观察者注入到可观察对象中。
AFAICT,混淆源于这样一个事实,即 new Observable(...)
语法与其说是一个有用的模式,不如说是一个低级工厂。
它或多或少是更直接的实现所使用的机制,如 of(value1, value2, ..., valueN)
、from(enumeration)
和 fromEvent(...)
。
这些方法是您应该关注的实际用例。
在幕后,所有这些方法都将某种同步或异步值或交互 桥接到 可观察流的美妙世界。为此,他们以某种方式 像 一个合适的观察者:他们生成项目并将它们放入流中。为此,他们使用了一个名为 next
的函数。就像 Observer
实现中的方法一样,bacause 实际上以完全相同的方式被调用。
具体来说,你可以在这里查看订阅方法的实现:
https://github.com/ReactiveX/rxjs/blob/master/src/internal/Observable.ts
如果您想了解订阅期间实际发生了什么,我建议您实际查看代码。 但是,IMO,你应该在熟悉各种 Observable 创建函数之后再尝试。
希望对您有所帮助。
Observable
创建流程如下:
作者定义了一个Observable
(这里手动加上new
,方便说明):
const myObservable = new Observable(function subscribe(subscriber) {
subscriber.next(1);
subscriber.next(2);
subscriber.complete();
return function tearDownLogic() {
console.log('runs when Observable for whatever reason is done (complete, error, or unsubscribed)')
}
});
上面传递给Observable
的subscribe
回调被Observable constructor保存在本地:
constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic) {
if (subscribe) {
this._subscribe = subscribe;
}
}
因此,我们将由我们或任何其他 pre-made Observable
定义的整个 subscribe
函数保存下来供以后执行。
Observer 可以以多种形式之一传递给 subscribe
回调。或者,直接作为一到三个函数(next、error、complete),或者作为 object 使用相同的三种方法中的一种或多种。为了解释的目的,我们将实现最后一个更详细的选项:
const observer = {
next(v) {
console.log(v);
}
error(err) {
console.log(err);
}
complete() {
console.log('Observable has now completed and can no longer emit values to observer');
}
}
现在,有趣的部分开始了。我们将 observer
传递给 Observable.subscribe(..)
方法:
myObserver.subscribe(observer);
subscribe method 看起来像这样:
subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void),
error?: (error: any) => void,
complete?: () => void): Subscription {
const { operator } = this;
const sink = toSubscriber(observerOrNext, error, complete);
if (operator) {
sink.add(operator.call(sink, this.source));
} else {
sink.add(
this.source || (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable) ?
this._subscribe(sink) :
this._trySubscribe(sink)
);
}
if (config.useDeprecatedSynchronousErrorHandling) {
if (sink.syncErrorThrowable) {
sink.syncErrorThrowable = false;
if (sink.syncErrorThrown) {
throw sink.syncErrorValue;
}
}
}
return sink;
}
简要描述,subscribe
方法:
- 收到
observer
之前讨论过的一种形式 toSubscriber
将观察者转换为Subscriber
object,无论其传入形式如何(Subscriber
实例保存在sink
变量中)- 注意:
operator
变量是undefined
,除非您订阅了运算符。因此,只需忽略operator
周围的 Subscriber
扩展(prototype-linked 到)Subscription
object,它的原型有两个重要的方法:unsubscribe()
、add()
add(..)
用于将“拆卸逻辑”(函数)添加到Observable
,当运行Observable
完成 或 取消订阅。它将接受传递给它的任何函数,将其包装在Subscription
object 中,并将该函数放入Subscription
的_unsubscribe
变量中。这个Subscription
保存在我们上面创建的Subscriber
上,在一个名为_subscriptions
的变量中。如前所述,我们这样做是为了在Subscriber
被 取消订阅 或 完成 时,所有add()
拆解逻辑执行- 作为旁注,
Observable.subscribe()
returnsSubscriber
实例。因此,您可以随时对其调用mySubscriber.add( // some tear down logic)
以添加将在Observable
完成 或 取消订阅 [=118] 时执行的函数=] - 现在包含一个重要部分:
this._trySubscribe(sink)
运行s(在add()
内,作为参数)。_trySubscribe(..)
是 运行 先前由Observable
构造函数保存的subscribe
回调函数。重要的是,它传入sink
(我们新的Subscriber
实例)作为Observable
回调的回调。也就是说,当Observable
里面的subscriber.next(1)
执行的时候,我们实际上是在sink
(Subscriber
)实例中执行next(1)
(next()
是在Subscriber
的原型上)。
if
语句
现在,我到此结束了。在 toSubscribe
内部和 取消订阅 过程中还有更多详细信息,但这些不在本问答的范围内。
简而言之,为了回答标题中的问题,Observer 确实被传递到 Observable
,只是在转换为统一的 [=28] 之后=] object.
希望这对以后的其他人有所帮助。