两个队列,一个并发 1,另一个并发 3
Two queues, one with concurrency 1 another with concurrency 3
我有这种情况:
const q1 = async.queue((task,cb) => task(cb), 1);
const q2 = async.queue((task,cb) => task(cb), 3);
基本上,情况是——如果我能锁定 q1,我就可以在 q2 中同时处理最多 3 个事情。也就是说,我程序中的大多数事情必须串行 运行,但当然事情可以并行 运行,但前提是它们首先在 q1 上获得 "lock"。然而,写这篇文章比预期的要难得多。我认为这类似于 reader/write 锁定问题。 https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock
换句话说,如果我对q1有锁,我只能处理q2的东西。实现这个的一个问题是,如果我总是必须在 q2 的关键部分获取 q1 的锁,那么我将无法在 q2 中使用更高的并发级别,它总是串行的。
我现在唯一的想法是翻转一个布尔值,像这样:
let q2HasLock = false;
if(q2hasLock === true){
q2.push(cb => ...);
}
else{
q1.push(cb => ...);
}
那我就可以用这个了:
q2.drain = () => {
q2HasLock = false;
};
但剩下的部分是我不知道how/when翻转q2HasLock
到true
。希望你能理解 problem/idea。此实现的另一个问题是,推送到 q2 的请求可能会使发送到 q1 的请求饿死,所以我可以这样做:
let q2HasLock = false, count = 0 ;
if(q2hasLock === true && count < 3){
count++;
q2.push(cb => ...);
}
else{
q1.push(cb => ...);
}
q2.drain = () => {
count = 0;
q2HasLock = false;
};
这有点棘手 - 我想尽可能简单地实现它!
好的,这应该可以满足要求,但最好找到比这更简单的方法:
const async = require('async');
const q1 = async.queue((task, cb) => task(cb), 1);
const q2 = async.queue((task, cb) => task(cb), 3);
const map = new Map();
let count = 0;
const getProm = function (q) {
if (!map.has(q)) {
map.set(q, new Promise(resolve => {
q1.push(cb => {
resolve(); // 'lock' on q1 has been obtained, nothing can use q1 until we release the lock by calling cb().
q.drain = () => {
q.drain = null;
count = 0;
map.delete(q);
cb();
};
});
}));
}
return map.get(q);
};
if(foo && count < 5){
return getProm(q2).then(v => {
q2.push(cb => {
setTimeout(cb, 140);
});
});
}
return q1.push(cb => {
setTimeout(cb, 30);
});
这是在做什么 - 一次只能 运行 一种类型的任务,但是如果任务类型是 foo,那么 3 个任务可以同时 运行。 count < 5
检查确保 q1 请求不会被 q2 请求饿死。
这里的技巧是使用承诺。解决承诺后,您仍然可以使用解决方案值调用 promise.then()
,尽管在这种情况下我们不需要该值。
为了稍微提高性能,我们可以使用布尔标志而不是承诺,但那里的逻辑要复杂得多。
我有这种情况:
const q1 = async.queue((task,cb) => task(cb), 1);
const q2 = async.queue((task,cb) => task(cb), 3);
基本上,情况是——如果我能锁定 q1,我就可以在 q2 中同时处理最多 3 个事情。也就是说,我程序中的大多数事情必须串行 运行,但当然事情可以并行 运行,但前提是它们首先在 q1 上获得 "lock"。然而,写这篇文章比预期的要难得多。我认为这类似于 reader/write 锁定问题。 https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock
换句话说,如果我对q1有锁,我只能处理q2的东西。实现这个的一个问题是,如果我总是必须在 q2 的关键部分获取 q1 的锁,那么我将无法在 q2 中使用更高的并发级别,它总是串行的。
我现在唯一的想法是翻转一个布尔值,像这样:
let q2HasLock = false;
if(q2hasLock === true){
q2.push(cb => ...);
}
else{
q1.push(cb => ...);
}
那我就可以用这个了:
q2.drain = () => {
q2HasLock = false;
};
但剩下的部分是我不知道how/when翻转q2HasLock
到true
。希望你能理解 problem/idea。此实现的另一个问题是,推送到 q2 的请求可能会使发送到 q1 的请求饿死,所以我可以这样做:
let q2HasLock = false, count = 0 ;
if(q2hasLock === true && count < 3){
count++;
q2.push(cb => ...);
}
else{
q1.push(cb => ...);
}
q2.drain = () => {
count = 0;
q2HasLock = false;
};
这有点棘手 - 我想尽可能简单地实现它!
好的,这应该可以满足要求,但最好找到比这更简单的方法:
const async = require('async');
const q1 = async.queue((task, cb) => task(cb), 1);
const q2 = async.queue((task, cb) => task(cb), 3);
const map = new Map();
let count = 0;
const getProm = function (q) {
if (!map.has(q)) {
map.set(q, new Promise(resolve => {
q1.push(cb => {
resolve(); // 'lock' on q1 has been obtained, nothing can use q1 until we release the lock by calling cb().
q.drain = () => {
q.drain = null;
count = 0;
map.delete(q);
cb();
};
});
}));
}
return map.get(q);
};
if(foo && count < 5){
return getProm(q2).then(v => {
q2.push(cb => {
setTimeout(cb, 140);
});
});
}
return q1.push(cb => {
setTimeout(cb, 30);
});
这是在做什么 - 一次只能 运行 一种类型的任务,但是如果任务类型是 foo,那么 3 个任务可以同时 运行。 count < 5
检查确保 q1 请求不会被 q2 请求饿死。
这里的技巧是使用承诺。解决承诺后,您仍然可以使用解决方案值调用 promise.then()
,尽管在这种情况下我们不需要该值。
为了稍微提高性能,我们可以使用布尔标志而不是承诺,但那里的逻辑要复杂得多。