在 AngularFire 中检索嵌套 Firestore 查询的 RxJS Observable

Retrieving an RxJS Observable of a nested Firestore query in AngularFire

我正在尝试将具有 DocumentReference 的对象的 Observable 转换为整个对象的 Observable。

我的 Firestore 查询 returns QuestDocument 的 Observable,它看起来如下(去除原始类型):

export interface QuestDocument {
    ...
    owner: DocumentReference<User>;
    ...
    collaborators?: DocumentReference<User>[];
    categories?: DocumentReference<Category>[];
}

在我的转换器中,我可以调用我的其他 Firestore 服务来检索 DocumentReferenceUserCategory 的值(平面结构,所以这里没有问题)。

我的目标是创建一个 Quest 类型的 Observable,但是我的嵌套 Observable 没有被正确解析。

export interface Quest {
  ...
  owner: User;
  ...
  collaborators?: User[];
  categories?: Category[];
}

这是我目前的情况:

doc$(docId: string): Observable<Quest> {
  return this.doc(docId).valueChanges()
  .pipe(
    mergeMap(questDoc => {
      const owner$ = this.userService.doc$(questDoc.owner.id);
      const collaborators$ = forkJoin(questDoc.collaborators.map(
        (userRef: DocumentReference) => {
          return this.userService.doc$(userRef.id)
        }
      ));
      const categories$ = forkJoin(questDoc.categories.map(
        (categoryRef: DocumentReference) => this.categoryService.doc$(categoryRef.id)
      ));
      const joined = forkJoin({
        owner: owner$,
        collaborators: collaborators$,
        categories: categories$
      });
      joined.subscribe(data => console.log(data));
      return joined.pipe(
        map(value => {
          return Object.defineProperties(questDoc, {
            qid: { value: docId },
            owner: { value: value.owner },
            collaborators: { value: value.collaborators },
            categories: { value: value.categories }
          }) as Quest;
        })
      )
    })
  );
}

所有打字都匹配,我应该收到一个 Observable<Quest>,但是当我尝试打印值时,它 returns undefined 而第二个 .pipe() 永远不会达到。

您没有正确使用 forkJoin 运算符。

完成的可观察对象将不再发射数据。把它当作一个封闭的管道。 ForkJoin 将等待所有流完成(关闭)后再发出一个数据。

如果您使用 this.afs.collection(...).doc(...).valueChanges() 获取数据,这些可观察值将保持活动状态。每次在 firestore 中更新数据时它们都会发出。

要完成它们,请使用 take(1)first()(它们将发出一次然后完成),或使用 combineLatest() 组合活动流并实时更新您的数据(不要忘记取消订阅 onDestroy 以防止任何内存泄漏)。

这是一个完整流的示例:

doc$(docId: string): Observable<Quest> {
  return this.doc(docId).valueChanges()
  .pipe(
    mergeMap(questDoc => {
      // observable will emit then complete thanks to the "take(1)"
      const owner$ = this.userService.doc$(questDoc.owner.id).pipe(take(1));
      const collaborators$ = forkJoin(questDoc.collaborators.map(
        (userRef: DocumentReference) => {
          // same thing here 
          return this.userService.doc$(userRef.id).pipe(take(1))
        }
      ));
      const categories$ = forkJoin(questDoc.categories.map(
         // and here
        (categoryRef: DocumentReference) => this.categoryService.doc$(categoryRef.id).pipe(take(1))
      ));
      const joined = forkJoin({
        owner: owner$,
        collaborators: collaborators$,
        categories: categories$
      });
      // joined.subscribe(data => console.log(data));
      return joined.pipe(
        // NEVER subscribe within a pipe, use a tap operator for side effects
        tap(data => console.log(data)),
        map(value => {
          return Object.defineProperties(questDoc, {
            qid: { value: docId },
            owner: { value: value.owner },
            collaborators: { value: value.collaborators },
            categories: { value: value.categories }
          }) as Quest;
        })
      )
    })
  );
}