如何组合可观察函数的多次执行?
How to combine multiple executions of observable function?
在 firestore 中,我想要检索多个文档。每个文档都有一个唯一的 sourceAddressValue,因此对于 N 个字符串的列表,我想检索潜在的 N 个文档。
我尝试了以下操作:
getLocationAddresses(addresses: string[]) {
const chunkSize = 10;
let addressesChunks = [];
if (addresses.length < chunkSize) {
addressesChunks.push(addresses);
} else {
addressesChunks = [...Array(Math.ceil(addresses.length / chunkSize))].map(_ => addresses.splice(0,chunkSize));
}
console.log(addressesChunks);
return of(...addressesChunks).pipe(
mergeMap<string[], any>((x) => this.db.collection('locations', ref =>
ref.where('sourceLocation', 'array-contains', x)).valueChanges()),
toArray() // when this is removed, the code inside getOrders is triggered multiple times
);
}
public getOrders() {
this.getJSON().subscribe(data => {
this.orders = data.orders;
const addresses = this.orders.map(item => `${item.address}, ${item.postalCode}`);
this.dbService.getLocationAddresses(addresses).subscribe(data => {
console.log('data retrieved');
console.log(data);
});
this.ordersRefreshed.next(this.orders);
});
}
在尝试执行上面的代码时,似乎没有执行完成。但是,当我在 getLocationAddresses 中注释掉 toArray() 时,订阅的函数会被多次触发,每个块都单独触发。
有谁知道如何将可观察函数的多个完成分组,以便它只触发观察者一次?
通过使用 combineLatest,函数 getLocationAddresses() returns 合并结果:
getLocationAddresses(addresses: string[]) {
const chunkSize = 10;
let addressesChunks: string[][] = [];
if (addresses.length < chunkSize) {
addressesChunks.push(addresses);
} else {
addressesChunks = [...Array(Math.ceil(addresses.length / chunkSize))].map(_ => addresses.splice(0,chunkSize));
}
console.log(addressesChunks);
const observables = addressesChunks.map(addresses => this.db.collection('locations', ref =>
ref.where('sourceLocation', 'in', addresses)).valueChanges());
return combineLatest(observables)
.pipe(map(arr => arr.reduce((acc, cur) => acc.concat(cur) ) ),);
}
我认为这里的关键点是 valueChanges()
returns 一个 Observable,它会在它引用的文档发生变化时随时通知。这基本上意味着返回的 Observable 没有完成,因此通过 mergeMap
生成的流没有完成,因此 toArray
不会触发,因为它会在源流完成时触发。
combineLatest
,另一方面,在所有至少触发一次之后,只要作为参数触发的其中一个可观察值就会触发。
现在,如果只想检索文档并且对在此类文档发生任何更改时发出的 Observable 不感兴趣,那么您可以尝试将 take(1)
运算符通过管道传递到每个 mergeMap
.
代码如下所示
getLocationAddresses(addresses: string[]) {
const chunkSize = 10;
let addressesChunks = [];
if (addresses.length < chunkSize) {
addressesChunks.push(addresses);
} else {
addressesChunks = [...Array(Math.ceil(addresses.length / chunkSize))].map(_ => addresses.splice(0,chunkSize));
}
console.log(addressesChunks);
return of(...addressesChunks).pipe(
mergeMap<string[], any>((x) => this.db.collection('locations', ref =>
ref.where('sourceLocation', 'array-contains', x)).valueChanges().pipe(
take(1)
)
),
toArray() // when this is removed, the code inside getOrders is triggered multiple times
);
}
take(1)
触发第一个通知,然后完成 Observable。由于上游 observables 最终将完成,因此 toArray
可以触发。
我不熟悉 Firebase rxJs 库,也许 methods/functions 包含我在上面试图描述的 take(1)
行为,但我希望我已经能够传递核心概念.
让我们首先解释一下您看到的行为:
When I comment out toArray()
inside getLocationAddresses
, however, the subscribed function is fired multiple times, for each chunk separately.
只要收到发射,就会触发 subscribe
中的代码。当您使用 mergeMap
时,您正在创建一个具有多个“内部可观察对象”的可观察对象。每当这些内部 observable 中的任何一个发出时,mergeMap
创建的 observable 都会发出。
因此,如果您将 n
排放量传递给 mergeMap
,您可以预期至少有 n
排放量(这些内部观测值可能发射不止一次).
[with toArray] it seems that the execution is not completed.
当您使用 toArray()
时,在其源可观察对象完成之前,它不会允许任何排放;然后它发出所有接收到的发射的数组。在这种情况下,源是由 mergeMap
创建的 observable,它由多个 .valueChanges()
个 observable 组成。
但是,只要 returned 集合中的任何文档发生变化,由 firestore .valueChanges()
创建的可观察对象就会发出,但永远不会完成。由于这些 observables 是长期存在的,toArray()
永远不会发出任何东西。
This StackBlitz说明了问题。
解决方案
解决方案取决于您想要的行为。您打算对这些查询中的每一个调用一次并 return 结果(一次完成)还是您打算维护一个反应流以发出查询的最新表示?
完成
您可以使用 take(1)
强制 observable 在收到 1 次发射后完成,从而允许 toArray()
也完成 (Example - 1A):
return of(...addressesChunks).pipe(
mergeMap(x =>
this.db.collection('locations', ref =>
ref.where('sourceLocation', 'array-contains', x)
).valueChanges().pipe(take(1)) // <-- take(1) forces completion of inner observables
),
toArray()
);
您可以使用 forkJoin
(Example 1B):
而不是 of
/mergeMap
/toArray
return forkJoin(
addressesChunks.map(
x => this.db.collection('locations', ref =>
ref.where('sourceLocation', 'array-contains', x)
).valueChanges().pipe(take(1))
)
);
响应式 Observable
您可以使用 combineLatest
从多个来源创建一个可观察对象,只要任何一个来源发出就发出:
return combineLatest(
addressesChunks.map(
x => this.db.collection('locations', ref =>
ref.where('sourceLocation', 'array-contains', x)
).valueChanges()
)
);
不过,我相信这正是 firestore .valueChages()
已经为您所做的。我知道您正在分块查询,但我很好奇为什么。
看起来您发出多个查询只是为了在您获得 return 值时将结果组合在一起。
我相信你可以简单地将所有 addresses
传递给一个
ref.where('sourceLocation', 'array-contains', addresses)
立即致电并获得结果。这样做对性能有影响吗?
在 firestore 中,我想要检索多个文档。每个文档都有一个唯一的 sourceAddressValue,因此对于 N 个字符串的列表,我想检索潜在的 N 个文档。
我尝试了以下操作:
getLocationAddresses(addresses: string[]) {
const chunkSize = 10;
let addressesChunks = [];
if (addresses.length < chunkSize) {
addressesChunks.push(addresses);
} else {
addressesChunks = [...Array(Math.ceil(addresses.length / chunkSize))].map(_ => addresses.splice(0,chunkSize));
}
console.log(addressesChunks);
return of(...addressesChunks).pipe(
mergeMap<string[], any>((x) => this.db.collection('locations', ref =>
ref.where('sourceLocation', 'array-contains', x)).valueChanges()),
toArray() // when this is removed, the code inside getOrders is triggered multiple times
);
}
public getOrders() {
this.getJSON().subscribe(data => {
this.orders = data.orders;
const addresses = this.orders.map(item => `${item.address}, ${item.postalCode}`);
this.dbService.getLocationAddresses(addresses).subscribe(data => {
console.log('data retrieved');
console.log(data);
});
this.ordersRefreshed.next(this.orders);
});
}
在尝试执行上面的代码时,似乎没有执行完成。但是,当我在 getLocationAddresses 中注释掉 toArray() 时,订阅的函数会被多次触发,每个块都单独触发。
有谁知道如何将可观察函数的多个完成分组,以便它只触发观察者一次?
通过使用 combineLatest,函数 getLocationAddresses() returns 合并结果:
getLocationAddresses(addresses: string[]) {
const chunkSize = 10;
let addressesChunks: string[][] = [];
if (addresses.length < chunkSize) {
addressesChunks.push(addresses);
} else {
addressesChunks = [...Array(Math.ceil(addresses.length / chunkSize))].map(_ => addresses.splice(0,chunkSize));
}
console.log(addressesChunks);
const observables = addressesChunks.map(addresses => this.db.collection('locations', ref =>
ref.where('sourceLocation', 'in', addresses)).valueChanges());
return combineLatest(observables)
.pipe(map(arr => arr.reduce((acc, cur) => acc.concat(cur) ) ),);
}
我认为这里的关键点是 valueChanges()
returns 一个 Observable,它会在它引用的文档发生变化时随时通知。这基本上意味着返回的 Observable 没有完成,因此通过 mergeMap
生成的流没有完成,因此 toArray
不会触发,因为它会在源流完成时触发。
combineLatest
,另一方面,在所有至少触发一次之后,只要作为参数触发的其中一个可观察值就会触发。
现在,如果只想检索文档并且对在此类文档发生任何更改时发出的 Observable 不感兴趣,那么您可以尝试将 take(1)
运算符通过管道传递到每个 mergeMap
.
代码如下所示
getLocationAddresses(addresses: string[]) {
const chunkSize = 10;
let addressesChunks = [];
if (addresses.length < chunkSize) {
addressesChunks.push(addresses);
} else {
addressesChunks = [...Array(Math.ceil(addresses.length / chunkSize))].map(_ => addresses.splice(0,chunkSize));
}
console.log(addressesChunks);
return of(...addressesChunks).pipe(
mergeMap<string[], any>((x) => this.db.collection('locations', ref =>
ref.where('sourceLocation', 'array-contains', x)).valueChanges().pipe(
take(1)
)
),
toArray() // when this is removed, the code inside getOrders is triggered multiple times
);
}
take(1)
触发第一个通知,然后完成 Observable。由于上游 observables 最终将完成,因此 toArray
可以触发。
我不熟悉 Firebase rxJs 库,也许 methods/functions 包含我在上面试图描述的 take(1)
行为,但我希望我已经能够传递核心概念.
让我们首先解释一下您看到的行为:
When I comment out
toArray()
insidegetLocationAddresses
, however, the subscribed function is fired multiple times, for each chunk separately.
只要收到发射,就会触发 subscribe
中的代码。当您使用 mergeMap
时,您正在创建一个具有多个“内部可观察对象”的可观察对象。每当这些内部 observable 中的任何一个发出时,mergeMap
创建的 observable 都会发出。
因此,如果您将 n
排放量传递给 mergeMap
,您可以预期至少有 n
排放量(这些内部观测值可能发射不止一次).
[with toArray] it seems that the execution is not completed.
当您使用 toArray()
时,在其源可观察对象完成之前,它不会允许任何排放;然后它发出所有接收到的发射的数组。在这种情况下,源是由 mergeMap
创建的 observable,它由多个 .valueChanges()
个 observable 组成。
但是,只要 returned 集合中的任何文档发生变化,由 firestore .valueChanges()
创建的可观察对象就会发出,但永远不会完成。由于这些 observables 是长期存在的,toArray()
永远不会发出任何东西。
This StackBlitz说明了问题。
解决方案
解决方案取决于您想要的行为。您打算对这些查询中的每一个调用一次并 return 结果(一次完成)还是您打算维护一个反应流以发出查询的最新表示?
完成
您可以使用 take(1)
强制 observable 在收到 1 次发射后完成,从而允许 toArray()
也完成 (Example - 1A):
return of(...addressesChunks).pipe(
mergeMap(x =>
this.db.collection('locations', ref =>
ref.where('sourceLocation', 'array-contains', x)
).valueChanges().pipe(take(1)) // <-- take(1) forces completion of inner observables
),
toArray()
);
您可以使用 forkJoin
(Example 1B):
of
/mergeMap
/toArray
return forkJoin(
addressesChunks.map(
x => this.db.collection('locations', ref =>
ref.where('sourceLocation', 'array-contains', x)
).valueChanges().pipe(take(1))
)
);
响应式 Observable
您可以使用 combineLatest
从多个来源创建一个可观察对象,只要任何一个来源发出就发出:
return combineLatest(
addressesChunks.map(
x => this.db.collection('locations', ref =>
ref.where('sourceLocation', 'array-contains', x)
).valueChanges()
)
);
不过,我相信这正是 firestore .valueChages()
已经为您所做的。我知道您正在分块查询,但我很好奇为什么。
看起来您发出多个查询只是为了在您获得 return 值时将结果组合在一起。
我相信你可以简单地将所有 addresses
传递给一个
ref.where('sourceLocation', 'array-contains', addresses)
立即致电并获得结果。这样做对性能有影响吗?