rxjs:为什么当另一个流使用 take(1) 时流发出两次

rxjs: why the stream emit twice when another stream use take(1)

当我使用take(1)时,它会console.log两次1,如下代码:

  const a$ = new BehaviorSubject(1).pipe(publishReplay(1), refCount());
  a$.pipe(take(1)).subscribe();
  a$.subscribe((v) => console.log(v)); // emit twice (1 1)

但是当我删除 take(1) 或删除 publishReplay(1), refCount() 时,它符合我的预期(只有一个 1 console.log)。

  const a$ = new BehaviorSubject(1).pipe(publishReplay(1), refCount());
  a$.subscribe();
  a$.subscribe((v) => console.log(v)); // emit 1

  // or

  const a$ = new BehaviorSubject(1);
  a$.pipe(take(1)).subscribe();
  a$.subscribe((v) => console.log(v)); // emit 1

为什么?

版本:rxjs 6.5.2

我们先来看看publishReplay是怎样的defined:

const subject = new ReplaySubject<T>(bufferSize, windowTime, scheduler);

return (source: Observable<T>) => multicast(() => subject, selector!)(source) as ConnectableObservable<R>;

multicast() 将 return 一个 ConnectableObservable,这是一个公开 connect 方法的可观察对象。与 refCount 结合使用,当第一个订阅者注册时源将被订阅,当没有更多活跃订阅者时将自动取消订阅源。 多播 行为是通过在数据消费者和数据生产者之间放置一个Subject(或任何类型的主题)来实现的。

() => subject 意味着每次订阅源时都会使用 相同的主题实例 ,这是为什么你得到那个的一个重要方面行为。

const src$ = (new BehaviorSubject(1)).pipe(
  publishReplay(1), refCount() // 1 1
);

src$.pipe(take(1)).subscribe()

src$.subscribe(console.log)

让我们看看上面代码片段的流程是什么:

src$.pipe(take(1)).subscribe()

由于它是第一个订阅者,源 (BehaviorSubject) 将被订阅。发生这种情况时,它将发出 1,这将必须 经历 使用中的 ReplaySubject。然后,主题将该值传递给它的订阅者(例如 take(1))。但是因为您正在使用 publishReplay(1)(1 表示 bufferSize),该值将由该主题缓存。

src$.subscribe(console.log)

方式refCount works是先订阅正在使用的Subject,再订阅源:

const refCounter = new RefCountSubscriber(subscriber, connectable);

// Subscribe to the subject in use
const subscription = connectable.subscribe(refCounter); 

if (!refCounter.closed) {
  // Subscribe to the source
  (<any> refCounter).connection = connectable.connect();
}

顺便说一句,here's what happens on connectable.subscribe

_subscribe(subscriber: Subscriber<T>) {
  return this.getSubject().subscribe(subscriber);
}

由于主题是 ReplaySubject,它会将缓存的值发送到其新注册的订阅者(因此 first 1)。然后,因为之前没有订阅者(由于 take(1),它在第一次发射后完成),源将再次取消订阅,这应该可以解释为什么你得到 second 1.


如果您只想获得一个 1 值,您可以通过确保每次订阅源时都使用不同的主题来实现:


const src$ = (new BehaviorSubject(1)).pipe(
  shareReplay({ bufferSize:1, refCount: true }) // 1
);

src$.pipe(take(1)).subscribe()

src$.subscribe(console.log)

StackBlitz.