多个订阅嵌套在一个订阅中
Multiple subscriptions nested into one subscription
我发现自己在尝试设置一个非常简单的 rxjs 订阅流程时感到困惑。将多个不相关的订阅嵌套到另一个订阅中。
我在 angular 申请中,在进行其他订阅之前我需要填写下一个主题。
这是我想要实现的嵌套版本。
subject0.subscribe(a => {
this.a = a;
subject1.subscribe(x => {
// Do some stuff that require this.a to exists
});
subject2.subscribe(y => {
// Do some stuff that require this.a to exists
});
});
我知道嵌套订阅不是好的做法,我尝试使用 flatMap
或 concatMap
但没有真正了解如何实现这一点。
您可以使用 concat
运算符来做到这一点。
const first = of('first').pipe(tap((value) => { /* doSomething */ }));
const second = of('second').pipe(tap((value) => { /* doSomething */ }));
const third = of('third').pipe(tap((value) => { /* doSomething */ }));
concat(first, second, third).subscribe();
这样,一切都按照定义的相同顺序链接和执行。
编辑
const first = of('first').pipe(tap(value => {
// doSomething
combineLatest(second, third).subscribe();
}));
const second = of('second').pipe(tap(value => { /* doSomething */ }));
const third = of('third').pipe(tap(value => { /* doSomething */ }));
first.subscribe();
这样,second
和 third
是 运行 异步的,一旦 first
发出。
你可以这样做:
subject$: Subject<any> = new Subject();
this.subject$.pipe(
switchMap(() => subject0),
tap(a => {
this.a = a;
}),
switchMap(() => subject1),
tap(x => {
// Do some stuff that require this.a to exists
}),
switchMap(() => subject2),
tap(y => {
// Do some stuff that require this.a to exists
})
);
如果你想触发这个,只需调用 this.subject$.next();
编辑:
这是 forkJoin 的一种可能方法,它叫平行主题。
subject$: Subject<any> = new Subject();
this.subject$.pipe(
switchMap(() => subject0),
tap(a => {
this.a = a;
}),
switchMap(
() => forkJoin(
subject1,
subject2
)),
tap([x,y] => {
// Do some stuff that require this.a to exists
})
);
将每个 Observable 的数据流分开总是一个好主意,这样您以后可以轻松地组合它们。
const first$ = this.http.get('one').pipe(
shareReplay(1)
)
shareReplay
用于使 Observable hot 因此它不会为每个订阅调用 http.get('one')
。
const second$ = this.first$.pipe(
flatMap(firstCallResult => this.http.post('second', firstCallResult))
);
const third$ = this.first$.pipe(
flatMap(firstCallResult => this.http.post('third', firstCallResult))
);
之后您可以订阅您需要的 Observables:
second$.subscribe(()=>{}) // in this case two requests will be sent - the first one (if there were no subscribes before) and the second one
third$.subscribe(() => {}) // only one request is sent - the first$ already has the response cached
如果您不想在任何地方存储 first$
的值,只需将其转换为:
this.http.get('one').pipe(
flatMap(firstCallResult => combineLatest([
this.http.post('two', firstCallResult),
this.http.post('three', firstCallResult)
])
).subscribe(([secondCallResult, thirdCallResult]) => {})
您也可以使用 BehaviorSubject
将值存储在其中:
const behaviorSubject = new BehaviorSubject<string>(null); // using BehaviorSubject does not require you to subscribe to it (because it's a hot Observable)
const first$ = behaviorSubject.pipe(
filter(Boolean), // to avoid emitting null at the beginning
flatMap(subjectValue => this.http.get('one?' + subjectValue))
)
const second$ = first$.pipe(
flatMap(firstRes => this.http.post('two', firstRes))
)
const third$ = first$.pipe(
flatMap(()=>{...})
)
behaviorSubject.next('1') // second$ and third$ will emit new values
behaviorSubject.next('2') // second$ and third$ will emit the updated values again
我发现自己在尝试设置一个非常简单的 rxjs 订阅流程时感到困惑。将多个不相关的订阅嵌套到另一个订阅中。
我在 angular 申请中,在进行其他订阅之前我需要填写下一个主题。
这是我想要实现的嵌套版本。
subject0.subscribe(a => {
this.a = a;
subject1.subscribe(x => {
// Do some stuff that require this.a to exists
});
subject2.subscribe(y => {
// Do some stuff that require this.a to exists
});
});
我知道嵌套订阅不是好的做法,我尝试使用 flatMap
或 concatMap
但没有真正了解如何实现这一点。
您可以使用 concat
运算符来做到这一点。
const first = of('first').pipe(tap((value) => { /* doSomething */ }));
const second = of('second').pipe(tap((value) => { /* doSomething */ }));
const third = of('third').pipe(tap((value) => { /* doSomething */ }));
concat(first, second, third).subscribe();
这样,一切都按照定义的相同顺序链接和执行。
编辑
const first = of('first').pipe(tap(value => {
// doSomething
combineLatest(second, third).subscribe();
}));
const second = of('second').pipe(tap(value => { /* doSomething */ }));
const third = of('third').pipe(tap(value => { /* doSomething */ }));
first.subscribe();
这样,second
和 third
是 运行 异步的,一旦 first
发出。
你可以这样做:
subject$: Subject<any> = new Subject();
this.subject$.pipe(
switchMap(() => subject0),
tap(a => {
this.a = a;
}),
switchMap(() => subject1),
tap(x => {
// Do some stuff that require this.a to exists
}),
switchMap(() => subject2),
tap(y => {
// Do some stuff that require this.a to exists
})
);
如果你想触发这个,只需调用 this.subject$.next();
编辑: 这是 forkJoin 的一种可能方法,它叫平行主题。
subject$: Subject<any> = new Subject();
this.subject$.pipe(
switchMap(() => subject0),
tap(a => {
this.a = a;
}),
switchMap(
() => forkJoin(
subject1,
subject2
)),
tap([x,y] => {
// Do some stuff that require this.a to exists
})
);
将每个 Observable 的数据流分开总是一个好主意,这样您以后可以轻松地组合它们。
const first$ = this.http.get('one').pipe(
shareReplay(1)
)
shareReplay
用于使 Observable hot 因此它不会为每个订阅调用 http.get('one')
。
const second$ = this.first$.pipe(
flatMap(firstCallResult => this.http.post('second', firstCallResult))
);
const third$ = this.first$.pipe(
flatMap(firstCallResult => this.http.post('third', firstCallResult))
);
之后您可以订阅您需要的 Observables:
second$.subscribe(()=>{}) // in this case two requests will be sent - the first one (if there were no subscribes before) and the second one
third$.subscribe(() => {}) // only one request is sent - the first$ already has the response cached
如果您不想在任何地方存储 first$
的值,只需将其转换为:
this.http.get('one').pipe(
flatMap(firstCallResult => combineLatest([
this.http.post('two', firstCallResult),
this.http.post('three', firstCallResult)
])
).subscribe(([secondCallResult, thirdCallResult]) => {})
您也可以使用 BehaviorSubject
将值存储在其中:
const behaviorSubject = new BehaviorSubject<string>(null); // using BehaviorSubject does not require you to subscribe to it (because it's a hot Observable)
const first$ = behaviorSubject.pipe(
filter(Boolean), // to avoid emitting null at the beginning
flatMap(subjectValue => this.http.get('one?' + subjectValue))
)
const second$ = first$.pipe(
flatMap(firstRes => this.http.post('two', firstRes))
)
const third$ = first$.pipe(
flatMap(()=>{...})
)
behaviorSubject.next('1') // second$ and third$ will emit new values
behaviorSubject.next('2') // second$ and third$ will emit the updated values again