如何使用 RxJS 创建队列
How to create a queue using RxJS
我想弄清楚 queueScheduler 在 rxjs(版本 6.2.2)中是如何工作的。
我是 运行 一个快速服务器,我想做的是接受多个请求,但是函数 processMetricRequest2
应该一次只处理一个项目,但是下面的代码......当我访问 /test1
连续两次,它会调用 processMetricRequest2
即使它还没有完成(webMetrics
函数需要几秒钟)。
关于我做错了什么的任何想法?谢谢!
router.get('/test1', function(req, res, next) {
let fn = partial(processMetricRequest2, req.query.input);
queueScheduler.schedule(fn);
res.render('index', { id: 1, current: {url: req.query.input}});
});
async function processMetricRequest2(url, arg) {
console.log('--processing:', url);
let result = await webMetrics(url);
console.log('--FINISHED: ', url);
return result;
}
您可能需要一个队列并按顺序处理请求。这可以使用 Subject
和 concatMap
和 process 函数来完成。 queueScheduler
意思完全不同...
const { Subject } = require('rxjs');
const { concatMap } = require('rxjs/operators');
const queue = new Subject();
router.get('/test1', function(req, res, next) {
queue.next(req.query.input);
res.render('index', { id: 1, current: {url: req.query.input}});
});
async function processMetricRequest2(url) {
console.log('--processing:', url);
let result = await webMetrics(url);
console.log('--FINISHED: ', url);
return result;
}
queue
.pipe(concatMap(processMetricRequest2))
.subscribe();
此外,如果出现错误,队列将停止!订阅将终止。所以你应该确保你处理 processMetricRequest2
中的错误
我想弄清楚 queueScheduler 在 rxjs(版本 6.2.2)中是如何工作的。
我是 运行 一个快速服务器,我想做的是接受多个请求,但是函数 processMetricRequest2
应该一次只处理一个项目,但是下面的代码......当我访问 /test1
连续两次,它会调用 processMetricRequest2
即使它还没有完成(webMetrics
函数需要几秒钟)。
关于我做错了什么的任何想法?谢谢!
router.get('/test1', function(req, res, next) {
let fn = partial(processMetricRequest2, req.query.input);
queueScheduler.schedule(fn);
res.render('index', { id: 1, current: {url: req.query.input}});
});
async function processMetricRequest2(url, arg) {
console.log('--processing:', url);
let result = await webMetrics(url);
console.log('--FINISHED: ', url);
return result;
}
您可能需要一个队列并按顺序处理请求。这可以使用 Subject
和 concatMap
和 process 函数来完成。 queueScheduler
意思完全不同...
const { Subject } = require('rxjs');
const { concatMap } = require('rxjs/operators');
const queue = new Subject();
router.get('/test1', function(req, res, next) {
queue.next(req.query.input);
res.render('index', { id: 1, current: {url: req.query.input}});
});
async function processMetricRequest2(url) {
console.log('--processing:', url);
let result = await webMetrics(url);
console.log('--FINISHED: ', url);
return result;
}
queue
.pipe(concatMap(processMetricRequest2))
.subscribe();
此外,如果出现错误,队列将停止!订阅将终止。所以你应该确保你处理 processMetricRequest2