RXJS - 避免内部订阅

RXJS - Avoid inner subscription

正在尝试寻找我需要的食谱,但到处都找不到。

我有这样的代码。

const Listeners = listen("data:join"); /* observable of people who want data */
const DataStream = stream("data"); /* observable of data */

我如何构建一个管道:

编辑:以内存安全的方式与此等效的是什么:

Listeners.subscribe((personListening) => {
     DataStream.subscribe((data) => personListening.send(data))
     // And until fromEvent(personListening, "data:leave") fires.
})
/* OR */
DataStream.subscribe((data) => {
    Listeners.subscribe((person) => {
        person.send(data);
    })
})

我想你想看看 rxjs 的 skip 和 take 运算符。

示例:

const data = interval(1000);

const start = timer(4500);
const end = timer(21800);

data.pipe(
  skipUntil(start),
  takeUntil(end),
).subscribe(console.log);

data 在这里是一个连续的发射流,每秒都有一个递增的数字。 startend 在定义的时间后发出一次。在控制台中,您将看到有限范围的数据流。

Stackblitz:https://stackblitz.com/edit/rxjs-ccdfif?file=index.ts

我不太确定你的可观察行为,但在一般层面上你可以使用任何 RxJS higher-order 映射运算符(如 switchMapconcatMap 等 - here) to map from one observable to another. And use RxJS takeUntil 运算符与基于另一个可观察对象的可观察对象 complete/unsubscribe 的差异。

您可以使用 takeUntil 在组件关闭时关闭所有打开的订阅。

尝试以下方法

import { Subject } from 'rxjs';
import { tap, takeUntil, switchMap } from 'rxjs/operators';

complete$ = new Subject<any>();

Listeners.pipe(
  switchMap((personListening) => {     // <-- switch to the `DataStream` observable
    return DataStream.pipe(
      tap((data) => personListening.send(data)),   // <-- call `send()` here
      takeUntil(fromEvent(personListening, 'data:leave'))
    );
  }),
  takeUntil(this.complete$)      // emit `complete$` on `ngOnDestroy` hook
).subscribe(
  _,                             // <-- do nothing on response
  (err) => console.log(err)      // <-- handle error
);

ngOnDestroy() {
  this.complete$.next();         // <-- close any open subscriptions
}