rxjs:为什么当另一个流使用 take(1) 时流发出两次
rxjs: why the stream emit twice when another stream use take(1)
当我使用take(1)
时,它会console.log两次1
,如下代码:
const a$ = new BehaviorSubject(1).pipe(publishReplay(1), refCount());
a$.pipe(take(1)).subscribe();
a$.subscribe((v) => console.log(v)); // emit twice (1 1)
但是当我删除 take(1)
或删除 publishReplay(1), refCount()
时,它符合我的预期(只有一个 1
console.log)。
const a$ = new BehaviorSubject(1).pipe(publishReplay(1), refCount());
a$.subscribe();
a$.subscribe((v) => console.log(v)); // emit 1
// or
const a$ = new BehaviorSubject(1);
a$.pipe(take(1)).subscribe();
a$.subscribe((v) => console.log(v)); // emit 1
为什么?
版本:rxjs 6.5.2
我们先来看看publishReplay
是怎样的defined:
const subject = new ReplaySubject<T>(bufferSize, windowTime, scheduler);
return (source: Observable<T>) => multicast(() => subject, selector!)(source) as ConnectableObservable<R>;
multicast()
将 return 一个 ConnectableObservable
,这是一个公开 connect
方法的可观察对象。与 refCount
结合使用,当第一个订阅者注册时源将被订阅,当没有更多活跃订阅者时将自动取消订阅源。 多播 行为是通过在数据消费者和数据生产者之间放置一个Subject
(或任何类型的主题)来实现的。
() => subject
意味着每次订阅源时都会使用 相同的主题实例 ,这是为什么你得到那个的一个重要方面行为。
const src$ = (new BehaviorSubject(1)).pipe(
publishReplay(1), refCount() // 1 1
);
src$.pipe(take(1)).subscribe()
src$.subscribe(console.log)
让我们看看上面代码片段的流程是什么:
src$.pipe(take(1)).subscribe()
由于它是第一个订阅者,源 (BehaviorSubject
) 将被订阅。发生这种情况时,它将发出 1
,这将必须 经历 使用中的 ReplaySubject
。然后,主题将该值传递给它的订阅者(例如 take(1)
)。但是因为您正在使用 publishReplay(1)
(1 表示 bufferSize
),该值将由该主题缓存。
src$.subscribe(console.log)
方式refCount
works是先订阅正在使用的Subject
,再订阅源:
const refCounter = new RefCountSubscriber(subscriber, connectable);
// Subscribe to the subject in use
const subscription = connectable.subscribe(refCounter);
if (!refCounter.closed) {
// Subscribe to the source
(<any> refCounter).connection = connectable.connect();
}
顺便说一句,here's what happens on connectable.subscribe
:
_subscribe(subscriber: Subscriber<T>) {
return this.getSubject().subscribe(subscriber);
}
由于主题是 ReplaySubject
,它会将缓存的值发送到其新注册的订阅者(因此 first 1
)。然后,因为之前没有订阅者(由于 take(1)
,它在第一次发射后完成),源将再次取消订阅,这应该可以解释为什么你得到 second 1
.
如果您只想获得一个 1
值,您可以通过确保每次订阅源时都使用不同的主题来实现:
const src$ = (new BehaviorSubject(1)).pipe(
shareReplay({ bufferSize:1, refCount: true }) // 1
);
src$.pipe(take(1)).subscribe()
src$.subscribe(console.log)
当我使用take(1)
时,它会console.log两次1
,如下代码:
const a$ = new BehaviorSubject(1).pipe(publishReplay(1), refCount());
a$.pipe(take(1)).subscribe();
a$.subscribe((v) => console.log(v)); // emit twice (1 1)
但是当我删除 take(1)
或删除 publishReplay(1), refCount()
时,它符合我的预期(只有一个 1
console.log)。
const a$ = new BehaviorSubject(1).pipe(publishReplay(1), refCount());
a$.subscribe();
a$.subscribe((v) => console.log(v)); // emit 1
// or
const a$ = new BehaviorSubject(1);
a$.pipe(take(1)).subscribe();
a$.subscribe((v) => console.log(v)); // emit 1
为什么?
版本:rxjs 6.5.2
我们先来看看publishReplay
是怎样的defined:
const subject = new ReplaySubject<T>(bufferSize, windowTime, scheduler);
return (source: Observable<T>) => multicast(() => subject, selector!)(source) as ConnectableObservable<R>;
multicast()
将 return 一个 ConnectableObservable
,这是一个公开 connect
方法的可观察对象。与 refCount
结合使用,当第一个订阅者注册时源将被订阅,当没有更多活跃订阅者时将自动取消订阅源。 多播 行为是通过在数据消费者和数据生产者之间放置一个Subject
(或任何类型的主题)来实现的。
() => subject
意味着每次订阅源时都会使用 相同的主题实例 ,这是为什么你得到那个的一个重要方面行为。
const src$ = (new BehaviorSubject(1)).pipe(
publishReplay(1), refCount() // 1 1
);
src$.pipe(take(1)).subscribe()
src$.subscribe(console.log)
让我们看看上面代码片段的流程是什么:
src$.pipe(take(1)).subscribe()
由于它是第一个订阅者,源 (BehaviorSubject
) 将被订阅。发生这种情况时,它将发出 1
,这将必须 经历 使用中的 ReplaySubject
。然后,主题将该值传递给它的订阅者(例如 take(1)
)。但是因为您正在使用 publishReplay(1)
(1 表示 bufferSize
),该值将由该主题缓存。
src$.subscribe(console.log)
方式refCount
works是先订阅正在使用的Subject
,再订阅源:
const refCounter = new RefCountSubscriber(subscriber, connectable);
// Subscribe to the subject in use
const subscription = connectable.subscribe(refCounter);
if (!refCounter.closed) {
// Subscribe to the source
(<any> refCounter).connection = connectable.connect();
}
顺便说一句,here's what happens on connectable.subscribe
:
_subscribe(subscriber: Subscriber<T>) {
return this.getSubject().subscribe(subscriber);
}
由于主题是 ReplaySubject
,它会将缓存的值发送到其新注册的订阅者(因此 first 1
)。然后,因为之前没有订阅者(由于 take(1)
,它在第一次发射后完成),源将再次取消订阅,这应该可以解释为什么你得到 second 1
.
如果您只想获得一个 1
值,您可以通过确保每次订阅源时都使用不同的主题来实现:
const src$ = (new BehaviorSubject(1)).pipe(
shareReplay({ bufferSize:1, refCount: true }) // 1
);
src$.pipe(take(1)).subscribe()
src$.subscribe(console.log)