使用 RxJS 延迟批量观察

Delay batch of observables with RxJS

我对我的数据库执行 http 请求,并注意到如果我一次发送所有请求,其中一些会出现超时错误。我想在调用之间添加延迟,这样服务器就不会过载。我正在尝试找到解决此问题的 RxJS 并且不想添加 setTimeout.

这是我目前的工作:

let observables = [];
for(let int = 0; int < 10000; int++){
   observables.push(new Observable((observer) => {
      db.add(doc[int], (err, result)=>{
         observer.next();
         observer.complete();
      })
   }))
}

forkJoin(observables).subscribe(
   data => {
   },
   error => {
      console.log(error);
   },
   () => {
      db.close();
   }
);

看起来您可以使用初始 timer 来触发 http 调用。例如

timer(delayTime).pipe(combineLatest(()=>sendHttpRequest()));

这只会在计时器 observable 完成后触发 sendHttpRequest() 方法。

所以你的解决方案。您可以执行以下操作...

   observables.push(
     timer(delay + int).pipe(combineLatest(new Observable((observer) => {
      db.add(doc[int], (err, result)=>{
         observer.next();
         observer.complete();
      }))
   }))

延迟可能从 0 开始,您可以使用循环的 int 索引增加延迟。

计时器文档:https://www.learnrxjs.io/learn-rxjs/operators/creation/timer

合并最新文档:https://www.learnrxjs.io/learn-rxjs/operators/combination/combinelatest

你确实可以用 Rxjs 很好地实现这一点。您将需要高阶可观察对象,这意味着您会将一个可观察对象发射到另一个可观察对象中,而高阶可观察对象会为您解决这个问题。

这种方法的好处是您可以轻松地 运行 X 请求 // 而无需自己管理请求池。

这是工作代码:

import { Observable, Subject } from "rxjs";
import { mergeAll, take, tap } from "rxjs/operators";

// this is just a mock to demonstrate how it'd behave if the API was
// taking 2s to reply for a call
const mockDbAddHtppCall = (id, cb) =>
  setTimeout(() => {
    cb(null, `some result for call "${id}"`);
  }, 2000);

// I have no idea what your response type looks like so I'm assigning
// any but of course you should have your own type instead of this
type YourRequestType = any;

const NUMBER_OF_ITEMS_TO_FETCH = 10;

const calls$$ = new Subject<Observable<YourRequestType>>();

calls$$
  .pipe(
    mergeAll(3),
    take(NUMBER_OF_ITEMS_TO_FETCH),
    tap({ complete: () => console.log(`All calls are done`) })
  )
  .subscribe(console.log);

for (let id = 0; id < NUMBER_OF_ITEMS_TO_FETCH; id++) {
  calls$$.next(
    new Observable(observer => {
      console.log(`Starting a request for ID "${id}""`);
      mockDbAddHtppCall(id, (err, result) => {
        if (err) {
          observer.error(err);
        } else {
          observer.next(result);
          observer.complete();
        }
      });
    })
  );
}

以及 Stackblitz 上的现场演示:https://stackblitz.com/edit/rxjs-z1x5m9

请打开浏览器的控制台,注意当呼叫被触发时显示的控制台日志会立即启动其中的 3 个,然后等待 1 个完成后再接另一个。

merge 并发值:

mergeAllmergeMap 都允许您定义订阅的可观察对象的最大数量。 mergeAll(1)/mergeMap(LAMBDA, 1) 基本上是 concatAll()/concatMap(LAMBDA).

merge 基本上只是静态 mergeAll

以下是您可以使用它的方法:

let observables = [...Array(10000).keys()].map(intV => 
  new Observable(observer => {
    db.add(doc[intV], (err, result) => {
      observer.next();
      observer.complete();
    });
  })
);

const MAX_CONCURRENT_REQUESTS = 10;

merge(...observables, MAX_CONCURRENT_REQUESTS).subscribe({
   next: data => {},
   error: err => console.log(err),
   complete: () => db.close()
});

注意:这不会对您的调用进行批处理,但它应该可以解决所描述的问题,而且它可能也比批处理快一点。


mergeMap 并发值:

也许使用 rangemergeMap

的 RxJS 方式稍微多一点
const MAX_CONCURRENT_REQUESTS = 10;

range(0, 10000).pipe(
  mergeMap(intV => 
    new Observable(observer => {
      db.add(doc[intV], (err, result) => {
        observer.next();
        observer.complete();
      });
    }),
    MAX_CONCURRENT_REQUESTS
  )
).subscribe({
   next: data => {},
   error: err => console.log(err),
   complete: () => db.close()
});