为什么在订阅独立工作时我的 forkJoin 订阅没有达到?

Why is my forkJoin subscription not reached while the subscriptions work independently?

我有两个要在 rxjsforkJoin 方法中合并的可观察对象。独立执行 observables 是可行的,但是使用 forkJoin 它没有达到 pipe finalize/subscribe 方法。

My.component.ts

....
const req1 = this.userService.getUser(this.loggedInUser.userId);
const req2 = this.boardGames$;
this.subscriptions$ = forkJoin([req1, req2])
  .pipe(
    finalize(() => {
      console.log('pipe'); // Is not reached
    })
  )
  .subscribe(([obj1, obj2]) => {
    console.log('subscribe'); // Is not reached
  }, err => console.log(err), ()=>console.log('compl'));
req1.subscribe((aa) => console.log(aa)); // This is logged
req2.subscribe((bb) => console.log(bb)); // This is logged
....

我正在使用 Angularfire2 请求。我不确定这是否是一个问题,因为订阅是独立工作的。 import { AngularFirestore } from 'angularfire2/firestore';

我在这里缺少什么?

forkjoin() 需要您订阅 complete 才能真正加入他们。因此,如果您的任何一个订阅没有完成,那么 forkjoin() 将永远无法到达。如果您使用的是 firebase,您的 observables 不会完成。

如果您的订阅未完成并且您需要来自两个可观察对象的流,那么您应该尝试 combineLatest()。这需要两个活动订阅,一旦每个订阅发出一个值,就会将这些值加入一个订阅中,并继续发出值直到完成。

Here is a link for combineLatest

如果您只需要在调用 firebase 之前检查用户是否有效,请尝试 switchMap()。这会将您的 user observable 切换到您的 boardgame observable,您将只处理 boardgame observable。

forkJoin 仅在所有可观察对象完成时发出。我看不到您的其余代码(例如 boardGames$ 可观察到的内容)。很可能您使用的可观察对象在第一次发射后不会完成,这是 AngularFirestore 的预期行为,因为最常见的是您订阅了数据库 (Firebase) 中的更改。

如果您需要在某些可观察对象发出时获取最新值,请使用 combineLatest。请记住,只有当每个源可观察对象发出时,它才会开始发出。

combineLatest([
    this.userService.getUser(this.loggedInUser.userId), 
    this.boardGames$
]).subscribe(([user, boardGames]) => {
     // Don't forget to unsubscribe
});

使用 merge 如果您想将 observables 合并为一个 observable。它适用于您当前的情况。像这样:

merge(
  this.userService.getUser(this.loggedInUser.userId).pipe(map(entity => ({entity, type: 'user'}))),
  this.boardGames$.pipe(map(entity => ({entity, type: 'boardGames'})))
).subscribe(({entity, type}) => {
    // Don't forget to unsubscribe
})

使用forkJoin你可以这样实现:

const req1 = this.userService.getUser(this.loggedInUser.userId).pipe(take(1));
const req2 = this.boardGames$.pipe(take(1));
this.subscriptions$ = forkJoin([req1, req2]).subscribe(() => {
    // I will complete after both observables emits.
});

请注意,即使使用 take(1),您仍然需要处理订阅,因为如果某些可观察对象永远不会发出并且组件被销毁,您就会发生内存泄漏。 awesome library 用于处理没有样板的订阅。