使用 RxJS 延迟批量观察
Delay batch of observables with RxJS
我对我的数据库执行 http 请求,并注意到如果我一次发送所有请求,其中一些会出现超时错误。我想在调用之间添加延迟,这样服务器就不会过载。我正在尝试找到解决此问题的 RxJS 并且不想添加 setTimeout
.
这是我目前的工作:
let observables = [];
for(let int = 0; int < 10000; int++){
observables.push(new Observable((observer) => {
db.add(doc[int], (err, result)=>{
observer.next();
observer.complete();
})
}))
}
forkJoin(observables).subscribe(
data => {
},
error => {
console.log(error);
},
() => {
db.close();
}
);
看起来您可以使用初始 timer
来触发 http 调用。例如
timer(delayTime).pipe(combineLatest(()=>sendHttpRequest()));
这只会在计时器 observable 完成后触发 sendHttpRequest()
方法。
所以你的解决方案。您可以执行以下操作...
observables.push(
timer(delay + int).pipe(combineLatest(new Observable((observer) => {
db.add(doc[int], (err, result)=>{
observer.next();
observer.complete();
}))
}))
延迟可能从 0 开始,您可以使用循环的 int
索引增加延迟。
计时器文档:https://www.learnrxjs.io/learn-rxjs/operators/creation/timer
合并最新文档:https://www.learnrxjs.io/learn-rxjs/operators/combination/combinelatest
你确实可以用 Rxjs 很好地实现这一点。您将需要高阶可观察对象,这意味着您会将一个可观察对象发射到另一个可观察对象中,而高阶可观察对象会为您解决这个问题。
这种方法的好处是您可以轻松地 运行 X 请求 // 而无需自己管理请求池。
这是工作代码:
import { Observable, Subject } from "rxjs";
import { mergeAll, take, tap } from "rxjs/operators";
// this is just a mock to demonstrate how it'd behave if the API was
// taking 2s to reply for a call
const mockDbAddHtppCall = (id, cb) =>
setTimeout(() => {
cb(null, `some result for call "${id}"`);
}, 2000);
// I have no idea what your response type looks like so I'm assigning
// any but of course you should have your own type instead of this
type YourRequestType = any;
const NUMBER_OF_ITEMS_TO_FETCH = 10;
const calls$$ = new Subject<Observable<YourRequestType>>();
calls$$
.pipe(
mergeAll(3),
take(NUMBER_OF_ITEMS_TO_FETCH),
tap({ complete: () => console.log(`All calls are done`) })
)
.subscribe(console.log);
for (let id = 0; id < NUMBER_OF_ITEMS_TO_FETCH; id++) {
calls$$.next(
new Observable(observer => {
console.log(`Starting a request for ID "${id}""`);
mockDbAddHtppCall(id, (err, result) => {
if (err) {
observer.error(err);
} else {
observer.next(result);
observer.complete();
}
});
})
);
}
以及 Stackblitz 上的现场演示:https://stackblitz.com/edit/rxjs-z1x5m9
请打开浏览器的控制台,注意当呼叫被触发时显示的控制台日志会立即启动其中的 3 个,然后等待 1 个完成后再接另一个。
merge
并发值:
mergeAll
和 mergeMap
都允许您定义订阅的可观察对象的最大数量。 mergeAll(1)
/mergeMap(LAMBDA, 1)
基本上是 concatAll()
/concatMap(LAMBDA)
.
merge
基本上只是静态 mergeAll
以下是您可以使用它的方法:
let observables = [...Array(10000).keys()].map(intV =>
new Observable(observer => {
db.add(doc[intV], (err, result) => {
observer.next();
observer.complete();
});
})
);
const MAX_CONCURRENT_REQUESTS = 10;
merge(...observables, MAX_CONCURRENT_REQUESTS).subscribe({
next: data => {},
error: err => console.log(err),
complete: () => db.close()
});
注意:这不会对您的调用进行批处理,但它应该可以解决所描述的问题,而且它可能也比批处理快一点。
mergeMap
并发值:
也许使用 range
和 mergeMap
的 RxJS 方式稍微多一点
const MAX_CONCURRENT_REQUESTS = 10;
range(0, 10000).pipe(
mergeMap(intV =>
new Observable(observer => {
db.add(doc[intV], (err, result) => {
observer.next();
observer.complete();
});
}),
MAX_CONCURRENT_REQUESTS
)
).subscribe({
next: data => {},
error: err => console.log(err),
complete: () => db.close()
});
我对我的数据库执行 http 请求,并注意到如果我一次发送所有请求,其中一些会出现超时错误。我想在调用之间添加延迟,这样服务器就不会过载。我正在尝试找到解决此问题的 RxJS 并且不想添加 setTimeout
.
这是我目前的工作:
let observables = [];
for(let int = 0; int < 10000; int++){
observables.push(new Observable((observer) => {
db.add(doc[int], (err, result)=>{
observer.next();
observer.complete();
})
}))
}
forkJoin(observables).subscribe(
data => {
},
error => {
console.log(error);
},
() => {
db.close();
}
);
看起来您可以使用初始 timer
来触发 http 调用。例如
timer(delayTime).pipe(combineLatest(()=>sendHttpRequest()));
这只会在计时器 observable 完成后触发 sendHttpRequest()
方法。
所以你的解决方案。您可以执行以下操作...
observables.push(
timer(delay + int).pipe(combineLatest(new Observable((observer) => {
db.add(doc[int], (err, result)=>{
observer.next();
observer.complete();
}))
}))
延迟可能从 0 开始,您可以使用循环的 int
索引增加延迟。
计时器文档:https://www.learnrxjs.io/learn-rxjs/operators/creation/timer
合并最新文档:https://www.learnrxjs.io/learn-rxjs/operators/combination/combinelatest
你确实可以用 Rxjs 很好地实现这一点。您将需要高阶可观察对象,这意味着您会将一个可观察对象发射到另一个可观察对象中,而高阶可观察对象会为您解决这个问题。
这种方法的好处是您可以轻松地 运行 X 请求 // 而无需自己管理请求池。
这是工作代码:
import { Observable, Subject } from "rxjs";
import { mergeAll, take, tap } from "rxjs/operators";
// this is just a mock to demonstrate how it'd behave if the API was
// taking 2s to reply for a call
const mockDbAddHtppCall = (id, cb) =>
setTimeout(() => {
cb(null, `some result for call "${id}"`);
}, 2000);
// I have no idea what your response type looks like so I'm assigning
// any but of course you should have your own type instead of this
type YourRequestType = any;
const NUMBER_OF_ITEMS_TO_FETCH = 10;
const calls$$ = new Subject<Observable<YourRequestType>>();
calls$$
.pipe(
mergeAll(3),
take(NUMBER_OF_ITEMS_TO_FETCH),
tap({ complete: () => console.log(`All calls are done`) })
)
.subscribe(console.log);
for (let id = 0; id < NUMBER_OF_ITEMS_TO_FETCH; id++) {
calls$$.next(
new Observable(observer => {
console.log(`Starting a request for ID "${id}""`);
mockDbAddHtppCall(id, (err, result) => {
if (err) {
observer.error(err);
} else {
observer.next(result);
observer.complete();
}
});
})
);
}
以及 Stackblitz 上的现场演示:https://stackblitz.com/edit/rxjs-z1x5m9
请打开浏览器的控制台,注意当呼叫被触发时显示的控制台日志会立即启动其中的 3 个,然后等待 1 个完成后再接另一个。
merge
并发值:
mergeAll
和 mergeMap
都允许您定义订阅的可观察对象的最大数量。 mergeAll(1)
/mergeMap(LAMBDA, 1)
基本上是 concatAll()
/concatMap(LAMBDA)
.
merge
基本上只是静态 mergeAll
以下是您可以使用它的方法:
let observables = [...Array(10000).keys()].map(intV =>
new Observable(observer => {
db.add(doc[intV], (err, result) => {
observer.next();
observer.complete();
});
})
);
const MAX_CONCURRENT_REQUESTS = 10;
merge(...observables, MAX_CONCURRENT_REQUESTS).subscribe({
next: data => {},
error: err => console.log(err),
complete: () => db.close()
});
注意:这不会对您的调用进行批处理,但它应该可以解决所描述的问题,而且它可能也比批处理快一点。
mergeMap
并发值:
也许使用 range
和 mergeMap
const MAX_CONCURRENT_REQUESTS = 10;
range(0, 10000).pipe(
mergeMap(intV =>
new Observable(observer => {
db.add(doc[intV], (err, result) => {
observer.next();
observer.complete();
});
}),
MAX_CONCURRENT_REQUESTS
)
).subscribe({
next: data => {},
error: err => console.log(err),
complete: () => db.close()
});