如何在其中一个流依赖于另一个流的情况下使用 combineLatest?
How to use combineLatest where one of the streams is dependent of one of the others?
我有一个 Angular-解析器,可以从后端获取数据。我有以下调用要执行:
GetProject(projectId): Observable<IProject>
GetSites(projectId): Observable<ISites[]>
GetPersons(siteId): Observable<IPerson[]>
我正在尝试使用 combineLatest 但不确定如何在我的场景中使用 RxJs。我希望所有请求在解析之前完成,但 GetPersons() 应该将 GetSites() 结果中第一项的 ID 作为输入。这是怎么做到的?
您似乎只想连接几个调用:
forkJoin([GetProject(projectId), GetSites(projectId)]).pipe(
concatMap(([project, sites]) => {
const siteId = /* whatever here */;
return GetPersons(siteId);
}),
).subscribe(...);
这还取决于您是想在观察者中接收所有响应还是仅接收最后一个响应。如果您想要所有回复,那么您需要将 GetPersons
与 map
链接起来并附加前两个回复:
GetPersons(siteId).pipe(
map(persons => [project, sites, persons]),
)
创建重播主题:
const sub = new ReplaySubject(3);
然后打电话
this.getProject(1).pipe(
tap(project => sub.next(project)),
switchMap(project => this.getSites(1)),
tap(sites => sub.next(sites)),
switchMap(sites => this.getPersons(sites[0].id)),
tap(person => sub.next(person))
);
您的重播主题将包含项目作为第一个值,网站作为第二个值,人物作为第三个值。
您可以使用带有 BehaviorSubject
的 combineLatest
格式来完成。
const obs = new BehaviorSubject([]);
const add = val => obs.pipe(
take(1),
map(v => ([...v, val]))
).subscribe(v => obs.next(v));
this.getProject(1).pipe(
tap(project => add(project)),
switchMap(project => this.getSites(1)),
tap(sites => add(sites)),
switchMap(sites => this.getPersons(sites[0].id)),
tap(person => add(person))
);
这一次,返回的值将是您所有值的数组。
最后,你有复杂的语法来连接它们,没有主语。
this.getProject(1).pipe(
switchMap(project => this.getSites(1).pipe(map(sites => ([project, sites])))),
switchMap(([project, sites]) => this.getPersons(sites[0].id).pipe(map(person => ([project, sites, map])))),
);
this.project$ = this.myService.getProject(projectId);
this.sites$ = this.myService.getSites(projectId);
this.persons$ = this.sites$.pipe(
switchMap(
(sites: ISites[]) => merge(...sites.map((site: ISites) => this.myService.getPersons(site.id))),
),
); // that should result in Observable<IPerson[][]>, you likely need to flatten it
我有一个 Angular-解析器,可以从后端获取数据。我有以下调用要执行:
GetProject(projectId): Observable<IProject>
GetSites(projectId): Observable<ISites[]>
GetPersons(siteId): Observable<IPerson[]>
我正在尝试使用 combineLatest 但不确定如何在我的场景中使用 RxJs。我希望所有请求在解析之前完成,但 GetPersons() 应该将 GetSites() 结果中第一项的 ID 作为输入。这是怎么做到的?
您似乎只想连接几个调用:
forkJoin([GetProject(projectId), GetSites(projectId)]).pipe(
concatMap(([project, sites]) => {
const siteId = /* whatever here */;
return GetPersons(siteId);
}),
).subscribe(...);
这还取决于您是想在观察者中接收所有响应还是仅接收最后一个响应。如果您想要所有回复,那么您需要将 GetPersons
与 map
链接起来并附加前两个回复:
GetPersons(siteId).pipe(
map(persons => [project, sites, persons]),
)
创建重播主题:
const sub = new ReplaySubject(3);
然后打电话
this.getProject(1).pipe(
tap(project => sub.next(project)),
switchMap(project => this.getSites(1)),
tap(sites => sub.next(sites)),
switchMap(sites => this.getPersons(sites[0].id)),
tap(person => sub.next(person))
);
您的重播主题将包含项目作为第一个值,网站作为第二个值,人物作为第三个值。
您可以使用带有 BehaviorSubject
的 combineLatest
格式来完成。
const obs = new BehaviorSubject([]);
const add = val => obs.pipe(
take(1),
map(v => ([...v, val]))
).subscribe(v => obs.next(v));
this.getProject(1).pipe(
tap(project => add(project)),
switchMap(project => this.getSites(1)),
tap(sites => add(sites)),
switchMap(sites => this.getPersons(sites[0].id)),
tap(person => add(person))
);
这一次,返回的值将是您所有值的数组。
最后,你有复杂的语法来连接它们,没有主语。
this.getProject(1).pipe(
switchMap(project => this.getSites(1).pipe(map(sites => ([project, sites])))),
switchMap(([project, sites]) => this.getPersons(sites[0].id).pipe(map(person => ([project, sites, map])))),
);
this.project$ = this.myService.getProject(projectId);
this.sites$ = this.myService.getSites(projectId);
this.persons$ = this.sites$.pipe(
switchMap(
(sites: ISites[]) => merge(...sites.map((site: ISites) => this.myService.getPersons(site.id))),
),
); // that should result in Observable<IPerson[][]>, you likely need to flatten it