RXJS - 避免内部订阅
RXJS - Avoid inner subscription
正在尝试寻找我需要的食谱,但到处都找不到。
我有这样的代码。
const Listeners = listen("data:join"); /* observable of people who want data */
const DataStream = stream("data"); /* observable of data */
我如何构建一个管道:
- 对于每个加入我的听众流的
person
,我都会为他们订阅数据流。
- 触发
data:leave
事件的每个人都会取消订阅流
- DataStream 的一长串管道运算符在引擎盖下只触发一次,而不是为每个加入的人触发一次。
编辑:以内存安全的方式与此等效的是什么:
Listeners.subscribe((personListening) => {
DataStream.subscribe((data) => personListening.send(data))
// And until fromEvent(personListening, "data:leave") fires.
})
/* OR */
DataStream.subscribe((data) => {
Listeners.subscribe((person) => {
person.send(data);
})
})
我想你想看看 rxjs 的 skip 和 take 运算符。
示例:
const data = interval(1000);
const start = timer(4500);
const end = timer(21800);
data.pipe(
skipUntil(start),
takeUntil(end),
).subscribe(console.log);
data
在这里是一个连续的发射流,每秒都有一个递增的数字。 start
和 end
在定义的时间后发出一次。在控制台中,您将看到有限范围的数据流。
Stackblitz:https://stackblitz.com/edit/rxjs-ccdfif?file=index.ts
我不太确定你的可观察行为,但在一般层面上你可以使用任何 RxJS higher-order 映射运算符(如 switchMap
、concatMap
等 - here) to map from one observable to another. And use RxJS takeUntil
运算符与基于另一个可观察对象的可观察对象 complete/unsubscribe 的差异。
您可以使用 takeUntil
在组件关闭时关闭所有打开的订阅。
尝试以下方法
import { Subject } from 'rxjs';
import { tap, takeUntil, switchMap } from 'rxjs/operators';
complete$ = new Subject<any>();
Listeners.pipe(
switchMap((personListening) => { // <-- switch to the `DataStream` observable
return DataStream.pipe(
tap((data) => personListening.send(data)), // <-- call `send()` here
takeUntil(fromEvent(personListening, 'data:leave'))
);
}),
takeUntil(this.complete$) // emit `complete$` on `ngOnDestroy` hook
).subscribe(
_, // <-- do nothing on response
(err) => console.log(err) // <-- handle error
);
ngOnDestroy() {
this.complete$.next(); // <-- close any open subscriptions
}
正在尝试寻找我需要的食谱,但到处都找不到。
我有这样的代码。
const Listeners = listen("data:join"); /* observable of people who want data */
const DataStream = stream("data"); /* observable of data */
我如何构建一个管道:
- 对于每个加入我的听众流的
person
,我都会为他们订阅数据流。 - 触发
data:leave
事件的每个人都会取消订阅流 - DataStream 的一长串管道运算符在引擎盖下只触发一次,而不是为每个加入的人触发一次。
编辑:以内存安全的方式与此等效的是什么:
Listeners.subscribe((personListening) => {
DataStream.subscribe((data) => personListening.send(data))
// And until fromEvent(personListening, "data:leave") fires.
})
/* OR */
DataStream.subscribe((data) => {
Listeners.subscribe((person) => {
person.send(data);
})
})
我想你想看看 rxjs 的 skip 和 take 运算符。
示例:
const data = interval(1000);
const start = timer(4500);
const end = timer(21800);
data.pipe(
skipUntil(start),
takeUntil(end),
).subscribe(console.log);
data
在这里是一个连续的发射流,每秒都有一个递增的数字。 start
和 end
在定义的时间后发出一次。在控制台中,您将看到有限范围的数据流。
Stackblitz:https://stackblitz.com/edit/rxjs-ccdfif?file=index.ts
我不太确定你的可观察行为,但在一般层面上你可以使用任何 RxJS higher-order 映射运算符(如 switchMap
、concatMap
等 - here) to map from one observable to another. And use RxJS takeUntil
运算符与基于另一个可观察对象的可观察对象 complete/unsubscribe 的差异。
您可以使用 takeUntil
在组件关闭时关闭所有打开的订阅。
尝试以下方法
import { Subject } from 'rxjs';
import { tap, takeUntil, switchMap } from 'rxjs/operators';
complete$ = new Subject<any>();
Listeners.pipe(
switchMap((personListening) => { // <-- switch to the `DataStream` observable
return DataStream.pipe(
tap((data) => personListening.send(data)), // <-- call `send()` here
takeUntil(fromEvent(personListening, 'data:leave'))
);
}),
takeUntil(this.complete$) // emit `complete$` on `ngOnDestroy` hook
).subscribe(
_, // <-- do nothing on response
(err) => console.log(err) // <-- handle error
);
ngOnDestroy() {
this.complete$.next(); // <-- close any open subscriptions
}