使用 RxJS 处理流结束的惯用方法

Idiomatic way to handle a stream end with RxJS

流结束时我需要执行一些操作。惯用的方法是什么?

现在我使用下面的代码:

source.subscribe(undefined, undefined, function() {
  socket.send({type: 'end'});
});

有几种方法可以做到这一点:

  1. 使用运算符 subscribeOnCompleted() 而不是将空值传递给 .subscribe 方法。

  2. 使用tapOnCompleted(),和上面一样,但是它不会启动序列,你可以在序列的中途注入它。

  3. 使用 .finally() 将在序列完成时执行(正常或其他情况)。

  4. 在您的示例中,您显示了一个副作用,但如果您正在清理资源,那么使用 .using() 会更具语义,它需要一次性并将其与生命周期联系起来订阅。

为了这些看起来像:

  1. source.subscribeOnCompleted(() => socket.send({type: 'end'}));

  2. source.tapOnCompleted(() => socket.send({type: 'end'})).subscribe()

  3. source.finally(() => socket.send({type: 'end'})).subscribe()

  4. Rx.Observable.using(() => createResource(), (resource) => source).subscribe()