如何使用 RxJS 创建队列

How to create a queue using RxJS

我想弄清楚 queueScheduler 在 rxjs(版本 6.2.2)中是如何工作的。 我是 运行 一个快速服务器,我想做的是接受多个请求,但是函数 processMetricRequest2 应该一次只处理一个项目,但是下面的代码......当我访问 /test1 连续两次,它会调用 processMetricRequest2 即使它还没有完成(webMetrics 函数需要几秒钟)。 关于我做错了什么的任何想法?谢谢!

router.get('/test1', function(req, res, next) {
  let fn = partial(processMetricRequest2, req.query.input);
  queueScheduler.schedule(fn);
  res.render('index', { id: 1, current: {url: req.query.input}});
});
async function processMetricRequest2(url, arg) {
  console.log('--processing:', url);
  let result = await webMetrics(url);
  console.log('--FINISHED: ', url);
  return result;
}

您可能需要一个队列并按顺序处理请求。这可以使用 SubjectconcatMap 和 process 函数来完成。 queueScheduler 意思完全不同...

const { Subject } = require('rxjs');
const { concatMap } = require('rxjs/operators');

const queue = new Subject();

router.get('/test1', function(req, res, next) {
  queue.next(req.query.input);
  res.render('index', { id: 1, current: {url: req.query.input}});
});

async function processMetricRequest2(url) {
  console.log('--processing:', url);
  let result = await webMetrics(url);
  console.log('--FINISHED: ', url);
  return result;
}

queue
  .pipe(concatMap(processMetricRequest2))
  .subscribe();

此外,如果出现错误,队列将停止!订阅将终止。所以你应该确保你处理 processMetricRequest2

中的错误