如何组合可观察函数的多次执行?

How to combine multiple executions of observable function?

在 firestore 中,我想要检索多个文档。每个文档都有一个唯一的 sourceAddressValue,因此对于 N 个字符串的列表,我想检索潜在的 N 个文档。

我尝试了以下操作:

getLocationAddresses(addresses: string[]) {
  const chunkSize = 10;
  let addressesChunks = [];
  if (addresses.length < chunkSize) {
      addressesChunks.push(addresses);
  } else {
    addressesChunks = [...Array(Math.ceil(addresses.length / chunkSize))].map(_ => addresses.splice(0,chunkSize));
  }
  console.log(addressesChunks);
  return of(...addressesChunks).pipe(
    mergeMap<string[], any>((x) => this.db.collection('locations', ref => 
    ref.where('sourceLocation', 'array-contains', x)).valueChanges()),
    toArray() // when this is removed, the code inside getOrders is triggered multiple times
);
  }

public getOrders() {
        this.getJSON().subscribe(data => {
            this.orders = data.orders;
            const addresses = this.orders.map(item => `${item.address}, ${item.postalCode}`);
            this.dbService.getLocationAddresses(addresses).subscribe(data => {
                console.log('data retrieved');
                console.log(data);
            });
            this.ordersRefreshed.next(this.orders);
        });
    }

在尝试执行上面的代码时,似乎没有执行完成。但是,当我在 getLocationAddresses 中注释掉 toArray() 时,订阅的函数会被多次触发,每个块都单独触发。

有谁知道如何将可观察函数的多个完成分组,以便它只触发观察者一次?

通过使用 combineLatest,函数 getLocationAddresses() returns 合并结果:

getLocationAddresses(addresses: string[]) {
      const chunkSize = 10;
      let addressesChunks: string[][] = [];
      if (addresses.length < chunkSize) {
          addressesChunks.push(addresses);
      } else {
        addressesChunks = [...Array(Math.ceil(addresses.length / chunkSize))].map(_ => addresses.splice(0,chunkSize));
      }
      console.log(addressesChunks);
      const observables = addressesChunks.map(addresses => this.db.collection('locations', ref => 
      ref.where('sourceLocation', 'in', addresses)).valueChanges());
      return combineLatest(observables)
      .pipe(map(arr => arr.reduce((acc, cur) => acc.concat(cur) ) ),);
  }

我认为这里的关键点是 valueChanges() returns 一个 Observable,它会在它引用的文档发生变化时随时通知。这基本上意味着返回的 Observable 没有完成,因此通过 mergeMap 生成的流没有完成,因此 toArray 不会触发,因为它会在源流完成时触发。

combineLatest,另一方面,在所有至少触发一次之后,只要作为参数触发的其中一个可观察值就会触发。

现在,如果只想检索文档并且对在此类文档发生任何更改时发出的 Observable 不感兴趣,那么您可以尝试将 take(1) 运算符通过管道传递到每个 mergeMap .

代码如下所示

getLocationAddresses(addresses: string[]) {
  const chunkSize = 10;
  let addressesChunks = [];
  if (addresses.length < chunkSize) {
      addressesChunks.push(addresses);
  } else {
    addressesChunks = [...Array(Math.ceil(addresses.length / chunkSize))].map(_ => addresses.splice(0,chunkSize));
  }
  console.log(addressesChunks);
  return of(...addressesChunks).pipe(
    mergeMap<string[], any>((x) => this.db.collection('locations', ref => 
      ref.where('sourceLocation', 'array-contains', x)).valueChanges().pipe(
         take(1)
      )
    ),
    toArray() // when this is removed, the code inside getOrders is triggered multiple times
);
}

take(1) 触发第一个通知,然后完成 Observable。由于上游 observables 最终将完成,因此 toArray 可以触发。

我不熟悉 Firebase rxJs 库,也许 methods/functions 包含我在上面试图描述的 take(1) 行为,但我希望我已经能够传递核心概念.

让我们首先解释一下您看到的行为:

When I comment out toArray() inside getLocationAddresses, however, the subscribed function is fired multiple times, for each chunk separately.

只要收到发射,就会触发 subscribe 中的代码。当您使用 mergeMap 时,您正在创建一个具有多个“内部可观察对象”的可观察对象。每当这些内部 observable 中的任何一个发出时,mergeMap 创建的 observable 都会发出。

因此,如果您将 n 排放量传递给 mergeMap,您可以预期至少有 n 排放量(这些内部观测值可能发射不止一次).

[with toArray] it seems that the execution is not completed.

当您使用 toArray() 时,在其源可观察对象完成之前,它不会允许任何排放;然后它发出所有接收到的发射的数组。在这种情况下,源是由 mergeMap 创建的 observable,它由多个 .valueChanges() 个 observable 组成。

但是,只要 returned 集合中的任何文档发生变化,由 firestore .valueChanges() 创建的可观察对象就会发出,但永远不会完成。由于这些 observables 是长期存在的,toArray() 永远不会发出任何东西。

This StackBlitz说明了问题。

解决方案

解决方案取决于您想要的行为。您打算对这些查询中的每一个调用一次并 return 结果(一次完成)还是您打算维护一个反应流以发出查询的最新表示?

完成

您可以使用 take(1) 强制 observable 在收到 1 次发射后完成,从而允许 toArray() 也完成 (Example - 1A):

return of(...addressesChunks).pipe(
  mergeMap(x => 
    this.db.collection('locations', ref => 
      ref.where('sourceLocation', 'array-contains', x)
    ).valueChanges().pipe(take(1))  // <-- take(1) forces completion of inner observables
  ),
  toArray()
);

您可以使用 forkJoin (Example 1B):

而不是 of/mergeMap/toArray
return forkJoin(
  addressesChunks.map(
    x => this.db.collection('locations', ref => 
      ref.where('sourceLocation', 'array-contains', x)
    ).valueChanges().pipe(take(1))
  )
);

响应式 Observable

您可以使用 combineLatest 从多个来源创建一个可观察对象,只要任何一个来源发出就发出:

return combineLatest(
  addressesChunks.map(
    x => this.db.collection('locations', ref => 
      ref.where('sourceLocation', 'array-contains', x)
    ).valueChanges()
  )
);

不过,我相信这正是 firestore .valueChages() 已经为您所做的。我知道您正在分块查询,但我很好奇为什么。

看起来您发出多个查询只是为了在您获得 return 值时将结果组合在一起。

我相信你可以简单地将所有 addresses 传递给一个

ref.where('sourceLocation', 'array-contains', addresses)

立即致电并获得结果。这样做对性能有影响吗?