rxjs - 通过主题订阅停止可观察

rxjs - stop observable with a subject subscription

我创建了一个 Observable 生成可能无限量的数据(例如计时器)。此数据通过主题访问,因此多个观察者将收到相同的值。

如何停止 Observable 产生新的值? (不修改 Observable 的实现)

// custom Observable, to visualize internal behavior
const timer$ = new rxjs.Observable(subscriber => {
  console.log("observable init");

  var counter = 0;
  const intervalId = setInterval(() => {
    ++counter;
    console.log("observable %s",counter);
    subscriber.next(counter);
  }, 1000);

  return () => {
    console.log("observable teardown");
    clearTimeout(intervalId);
  }
});

// subscribe through a subject
const subject$ = new rxjs.Subject();
timer$.subscribe(subject$);

const subscription = subject$.subscribe(value => console.log("observer %s", value));

// cancel subscription
setTimeout(() => {
  console.log("unsubscribe observer");
  subscription.unsubscribe();
  // TODO how to stop Observable generating new values?
}, 3000);

jsfiddle: https://jsfiddle.net/gy4tfd5w/

幸运的是,在 RxJS 中有一种专门而优雅的方法来解决这个问题。

您的要求是

multiple observers [...] receive the same values

这叫做 multicast observable 并且有某些 operators 用于从普通 "cold" observable 创建一个.

例如,不是直接创建 Subject 的实例,而是可以将可观察到的源通过管道传递给 share 运算符,它会为您创建 Subjectshare 的文档如下:

Returns a new Observable that multicasts (shares) the original Observable. As long as there is at least one Subscriber this Observable will be subscribed and emitting data. When all subscribers have unsubscribed it will unsubscribe from the source Observable.

最后一句显示了sharesource$.subscribe(subject)之间的细微差别。 share 保留所谓的 refCount,当没有订阅者离开时,它会自动从其来源取消订阅 Subject

应用于您的代码,它看起来像这样:

const timer$ = 
    new rxjs.Observable(subscriber => {// your unchanged implementation})
    .pipe(
        share()
    );

const subscription = timer$.subscribe(value => console.log("observer %s", value));

这是包含您的示例代码的完整版本:

https://jsfiddle.net/50q746ad/

顺便说一句,share 不是唯一执行多播的运营商。 There are great learning resources 更深入地讨论了该主题。

因此,经过一些惩罚性研究后,我为这个问题添加了自己的 npm 库。

Improves previous answer by NOT having to add any extra convolution variables and ease of use.

这是我阻止疯狂可观察中流的解决方案。

通过使用信号 Subject 和 rxjs 运算符:takeUntil

例子

const stopSignal$ = new Subject();

infinitelyGenerating$
  .pipe(takeUntil(stopSignal$))
  .subscribe(val => {
    if (val === 'bad') {
      stopSignal$.next()
    }
  })