Observable 非空时默认
Observable defaulting when not empty
我有一些代码从数据库中读取,迭代每一行数据并对其执行一些逻辑,然后创建一个可观察对象,然后写入数据库,将其添加到一个数组(创建一个可观察对象数组), 因此当通过 forkJoin
订阅 observables 数组时,所有必要的数据都会写入数据库。
在数组中的 observables 数量变得相当大之前,这似乎工作得很好。行数可以是 0-6000 之间的任何值,因此数组的大小可以增长到这个值。当它确实达到这个大小时,可观察对象不再写入数据库,而是 returns 来自 defaultIfEmpty
的默认值。我很困惑为什么它在较小数量的可观察对象下正常工作,但在较大数量时突然变空...
结合代码示例可能更清楚一些
function writeToDB() {
// rows taken from the database, n = 0..6000
data = []
// array of observables
observables = []
for (const row of data) {
if (row.age > 20) {
// websocket between service and database, returns an observable
const observable = websocket.put(row).pipe(
o$.catchError((err) => {
return r$.of(err)
}),
o$.defaultIfEmpty({
success: true,
status: 200
})
);
observables.push(observable);
}
}
return forkJoin([...observables]);
}
使用此示例在订阅时工作得非常好,除了大型数据集,其中数组 observables
的长度约为 5000。那时它开始 return defaultIfEmpty
值 { success: true, status: 200 }
我不知道为什么...任何帮助或建议将不胜感激。
从您在此处显示的内容看不清楚。尽管如此,如果这适用于较少数量的调用,那么 websocket
很有可能在这些数字上表现出一些奇怪的行为。
值得尝试的方法可能是限制您的 websocket 调用的并发性。
function writeToDB(data) {
// data contains rows taken from the database, n = 0..6000
return from(data).pipe(
filter(row => row.age > 20),
map(row => websocket.put(row).pipe(
catchError(err => of(err)),
// last makes sure that mergeAll behaves like forkJoin
last(undefined, {
success: true,
status: 200
})
)),
// mergeAll lets you choose how many can run concurrently
// for example, at most 50 websocket calls are made at
// once here
mergeAll(50),
toArray()
);
}
在这种情况下,我更喜欢 map
、 mergeAll
而不是 mergeMap
(因为我认为您不太可能错过它的并发方面),但您可以使用.
function writeToDB(data) {
// data contains rows taken from the database, n = 0..6000
return from(data).pipe(
filter(row => row.age > 20),
mergeMap(row => websocket.put(row).pipe(
catchError(err => of(err)),
// last makes sure that mergeMap behaves like forkJoin
last(undefined, {
success: true,
status: 200
})
), 50), // <- sneaky! ;)
toArray()
);
}
我有一些代码从数据库中读取,迭代每一行数据并对其执行一些逻辑,然后创建一个可观察对象,然后写入数据库,将其添加到一个数组(创建一个可观察对象数组), 因此当通过 forkJoin
订阅 observables 数组时,所有必要的数据都会写入数据库。
在数组中的 observables 数量变得相当大之前,这似乎工作得很好。行数可以是 0-6000 之间的任何值,因此数组的大小可以增长到这个值。当它确实达到这个大小时,可观察对象不再写入数据库,而是 returns 来自 defaultIfEmpty
的默认值。我很困惑为什么它在较小数量的可观察对象下正常工作,但在较大数量时突然变空...
结合代码示例可能更清楚一些
function writeToDB() {
// rows taken from the database, n = 0..6000
data = []
// array of observables
observables = []
for (const row of data) {
if (row.age > 20) {
// websocket between service and database, returns an observable
const observable = websocket.put(row).pipe(
o$.catchError((err) => {
return r$.of(err)
}),
o$.defaultIfEmpty({
success: true,
status: 200
})
);
observables.push(observable);
}
}
return forkJoin([...observables]);
}
使用此示例在订阅时工作得非常好,除了大型数据集,其中数组 observables
的长度约为 5000。那时它开始 return defaultIfEmpty
值 { success: true, status: 200 }
我不知道为什么...任何帮助或建议将不胜感激。
从您在此处显示的内容看不清楚。尽管如此,如果这适用于较少数量的调用,那么 websocket
很有可能在这些数字上表现出一些奇怪的行为。
值得尝试的方法可能是限制您的 websocket 调用的并发性。
function writeToDB(data) {
// data contains rows taken from the database, n = 0..6000
return from(data).pipe(
filter(row => row.age > 20),
map(row => websocket.put(row).pipe(
catchError(err => of(err)),
// last makes sure that mergeAll behaves like forkJoin
last(undefined, {
success: true,
status: 200
})
)),
// mergeAll lets you choose how many can run concurrently
// for example, at most 50 websocket calls are made at
// once here
mergeAll(50),
toArray()
);
}
在这种情况下,我更喜欢 map
、 mergeAll
而不是 mergeMap
(因为我认为您不太可能错过它的并发方面),但您可以使用.
function writeToDB(data) {
// data contains rows taken from the database, n = 0..6000
return from(data).pipe(
filter(row => row.age > 20),
mergeMap(row => websocket.put(row).pipe(
catchError(err => of(err)),
// last makes sure that mergeMap behaves like forkJoin
last(undefined, {
success: true,
status: 200
})
), 50), // <- sneaky! ;)
toArray()
);
}