正确使用 takeUntil() 停止 observable

Correctly use takeUntil() for stopping observable

我正在尝试创建 2 个基本上执行逆运算的可观察对象。它是一个服务发现端点,因此在启动应用程序时,它必须尝试注册到服务发现,直到成功为止。所以我想创建一个这样的可观察对象:

const create$ = Rx.Observable.create((observer) => {
    observer.next('Trying to create observation');
    sp.put(endpoint, { json: true }, (err, res, payload) => {
      err ? observer.error(err) : observer.complete();
    });
  });

而当应用程序在正常关闭时,我想做反向操作。像这样:

const delete$ = Rx.Observable.create((observer) => {
    console.log('deleted subscribed');
    observer.next('Trying to delete observation');
    sp.delete(endpoint, { json: true }, (err, res, payload) => {
      err ? observer.error(err) : observer.complete();
    });
  });

所以我决定创建一个函数,该函数 returns 一个具有 .create().delete() 的对象。我要解决的问题是,当应用程序启动并尝试注册时,但无法到达服务发现端点,稍后应用程序启动它的正常关闭过程并调用 .delete() 操作,然后 .create() 操作不应该再运行了。

function observe({ url, version, serviceName }) {
  const endpoint = `/endpoint/${serviceName}/${version}/${encodeURIComponent(url)}`;

  const create$ = Rx.Observable.create((observer) => {
    observer.next('Trying to create observation');
    sp.put(endpoint, { json: true }, (err, res, payload) => {
      err ? observer.error(err) : observer.complete();
    });
  });

  const delete$ = Rx.Observable.create((observer) => {
    console.log('deleted subscribed');
    observer.next('Trying to delete observation');
    sp.delete(endpoint, { json: true }, (err, res, payload) => {
      err ? observer.error(err) : observer.complete();
    });
  });

  return {
    create() {
      return create$.retry(Number.POSITIVE_INFINITY).takeUntil(delete$); // This is where I would want to takeUntil()
    },
    delete({ interval = 5000, times = 0 } = {}) {
      return delete$.retry(times);
    },
  }
}

我遇到的问题是,当使用 .takeUntil() 时,它订阅了 delete$,它开始执行删除操作,其效果是立即停止 create$ observable。

我试过 takeUntil(Observable.merge(Observable.never(), delete$)) 但它同时订阅了两者,所以不起作用。我也尝试过 takeUntil(Observable.concat(Observable.never(), delete$)) 第一个永远不会结束 (:P),第二个永远不会被订阅。

大多数 Observable 是 coldunicast。这是一个有点冗长的话题,所以我将在很大程度上遵从这篇伟大的文章 Hot vs Cold Observables,但总结一下:

COLD is when your observable creates the producer

// COLD
var cold = new Observable((observer) => {
  var producer = new Producer();
  // have observer listen to producer here
});

HOT is when your observable closes over the producer

// HOT
var producer = new Producer();
var hot = new Observable((observer) => {
  // have observer listen to producer here
});

在您的示例中,这是一个关键的区别,因为 create$delete$ 都是冷的。因此,正如您发现的那样,向 takeUntil 提供 delete$ 会导致订阅 delete$,从而启动请求。

如果您想保持代码的 structure/API 不变,实现此目的的一种方法是使用被视为某种 "notifier" 的主题。主题是多播和 "hot"(即使他们自己不做任何事情)。

function observe({ url, version, serviceName }) {
  // etc...

  const shutdown$ = new Subject(); // <---------------- create our notifier

  return {
    create() {
      return create$
        .retry(Number.POSITIVE_INFINITY)
        .takeUntil(shutdown$); // <-------------------- take our notifier
    },
    delete({ interval = 5000, times = 0 } = {}) {
      return Observable.defer(() => {
        shutdown$.next(); // <------------------------- notify
        return delete$.retry(times);
      });
    }
  };
}

我们使用了 Observable.defer() 这样我们就可以在有人实际订阅我们 return.

的 Observable 时执行 shutdown$.next() 副作用

方法 #1

您可以在 .takeUntill 中使用 Observable,而无需通过创建中间体 Subject 实际订阅它。然后您可以订阅该主题而不是原始主题。是这样的:

const delete$ = new Subject();
...
create() {
  return create$
    .takeUntill(delete$);   
},
delete() {
  create(...).subscribe(delete$);
  return delete;
}

方法 #2

但是在你的情况下,我认为创建主题会更好,它会在调用 .delete 时通知。像这样:

const onDelete$ = new Subject();
...
create() {
  return create$
    .takeUntill(onDelete$); 
},
delete() {
  onDelete$.next();
  return ...
}