公牛队列:当作业失败时,如何停止队列处理剩余的作业?
Bull queue: When a job fails, how to stop queue from processing remaining jobs?
我正在使用 bull 队列来处理一些作业。在当前场景中,每个作业都是某种操作。因此,只要队列中操作列表中的一个操作(作业)失败,队列就必须停止处理剩余的作业(操作)。
到目前为止我尝试了什么?
所以我试图在特定作业失败时暂停队列。接下来队列在耗尽时恢复。现在,当它恢复时,队列不会从失败的作业开始,而是选择下一个作业。
var Queue = require('bull');
let redisOptions = {
redis: { port: 6379, host: '127.0.0.1' },
limiter: { max: 1, duration: 1000 }
}
var myQueue = new Queue('Linear-Queue', redisOptions);
myQueue.process('Type-1', function (job, done) {
setTimeout(() => {
done(job.data.error);
}, job.data.time);
});
let options = {
attempts: 3,
removeOnComplete: false, // removes job from queue on success
removeOnFail: false // removes job from queue on failure
}
setTimeout(() => {
myQueue.add('Type-1', { time: 10000, description: "Type-1 One", error: false }, options);
}, 1 * 1000);
setTimeout(() => {
myQueue.add('Type-1', { time: 5000, description: "Type-1 two", error: true }, options);
}, 2 * 1000);
setTimeout(() => {
myQueue.add('Type-1', { time: 3000, description: "Type-1 three", error: false }, options);
}, 3 * 1000);
myQueue.on('completed', function (job, result) {
console.log("Completed: " + job.data.description);
});
myQueue.on('failed', async function (job, error) {
console.log("Failed: " + job.data.description);
try {
await myQueue.pause();
} catch (error) {
console.log(error);
}
});
myQueue.on('drained', async function () {
try {
await myQueue.resume();
} catch (error) {
console.log(error);
}
});
当前输出:
预期输出:如果 Type-1 two
在第 3 次尝试中成功完成。
Completed: Type-1 One
Failed: Type-1 two
Failed: Type-1 two
Completed: Type-1 two
Completed: Type-1 three
预期输出:如果 Type-1 two
在第三次尝试中也失败了。
Completed: Type-1 One
Failed: Type-1 two
Failed: Type-1 two
Failed: Type-1 two
我想要的是队列必须停止处理新作业,直到当前作业无任何失败地完成。如果出现任何失败,失败的作业必须 运行 一些 x
次。在 x+1
尝试时,它必须清除(删除所有作业)队列。那么如何在队列中实现这种线性行为。
在bull
中,不可能在失败后立即重复相同的作业,然后再在队列中拾取下一个作业。
解决方案:
- 创建新作业并将其优先级设置为小于当前作业类型的值。
- 释放失败的作业(
resolve()
或 done()
)
bull
将立即选取此新作业进行处理。
示例代码:在下面的代码中,Job-3 将失败并创建新作业,依此类推,直到“作业目的”在某个时间点成功。
var Queue = require('bull');
let redisOptions = {
redis: { port: 6379, host: '127.0.0.1' }
}
var myQueue = new Queue('Linear-Queue', redisOptions);
myQueue.process('Type-1', function (job, done) {
console.log(`Processing Job-${job.id} Attempt: ${job.attemptsMade}`);
downloadFile(job, async function (error) {
if (error) {
await repeatSameJob(job, done);
} else {
done();
}
});
});
async function repeatSameJob(job, done) {
let newJob = await myQueue.add('Type-1', job.data, { ...{ priority: 1 }, ...job.opts });
console.log(`Job-${job.id} failed. Creating new Job-${newJob.id} with highest priority for same data.`);
done(true);
}
function downloadFile(job, done) {
setTimeout(async () => {
done(job.data.error)
}, job.data.time);
}
myQueue.on('completed', function (job, result) {
console.log("Completed: Job-" + job.id);
});
myQueue.on('failed', async function (job, error) {
console.log("Failed: Job-" + job.id);
});
let options = {
removeOnComplete: true, // removes job from queue on success
removeOnFail: true // removes job from queue on failure
}
for (let i = 1; i <= 5; i++) {
let error = false;
if (i == 3) { error = true; }
setTimeout(i => {
let jobData = {
time: i * 2000,
error: error,
description: `Job-${i}`
}
myQueue.add('Type-1', jobData, options);
}, i * 2000, i);
}
输出:
我正在使用 bull 队列来处理一些作业。在当前场景中,每个作业都是某种操作。因此,只要队列中操作列表中的一个操作(作业)失败,队列就必须停止处理剩余的作业(操作)。
到目前为止我尝试了什么?
所以我试图在特定作业失败时暂停队列。接下来队列在耗尽时恢复。现在,当它恢复时,队列不会从失败的作业开始,而是选择下一个作业。
var Queue = require('bull');
let redisOptions = {
redis: { port: 6379, host: '127.0.0.1' },
limiter: { max: 1, duration: 1000 }
}
var myQueue = new Queue('Linear-Queue', redisOptions);
myQueue.process('Type-1', function (job, done) {
setTimeout(() => {
done(job.data.error);
}, job.data.time);
});
let options = {
attempts: 3,
removeOnComplete: false, // removes job from queue on success
removeOnFail: false // removes job from queue on failure
}
setTimeout(() => {
myQueue.add('Type-1', { time: 10000, description: "Type-1 One", error: false }, options);
}, 1 * 1000);
setTimeout(() => {
myQueue.add('Type-1', { time: 5000, description: "Type-1 two", error: true }, options);
}, 2 * 1000);
setTimeout(() => {
myQueue.add('Type-1', { time: 3000, description: "Type-1 three", error: false }, options);
}, 3 * 1000);
myQueue.on('completed', function (job, result) {
console.log("Completed: " + job.data.description);
});
myQueue.on('failed', async function (job, error) {
console.log("Failed: " + job.data.description);
try {
await myQueue.pause();
} catch (error) {
console.log(error);
}
});
myQueue.on('drained', async function () {
try {
await myQueue.resume();
} catch (error) {
console.log(error);
}
});
当前输出:
预期输出:如果 Type-1 two
在第 3 次尝试中成功完成。
Completed: Type-1 One
Failed: Type-1 two
Failed: Type-1 two
Completed: Type-1 two
Completed: Type-1 three
预期输出:如果 Type-1 two
在第三次尝试中也失败了。
Completed: Type-1 One
Failed: Type-1 two
Failed: Type-1 two
Failed: Type-1 two
我想要的是队列必须停止处理新作业,直到当前作业无任何失败地完成。如果出现任何失败,失败的作业必须 运行 一些 x
次。在 x+1
尝试时,它必须清除(删除所有作业)队列。那么如何在队列中实现这种线性行为。
在bull
中,不可能在失败后立即重复相同的作业,然后再在队列中拾取下一个作业。
解决方案:
- 创建新作业并将其优先级设置为小于当前作业类型的值。
- 释放失败的作业(
resolve()
或done()
) bull
将立即选取此新作业进行处理。
示例代码:在下面的代码中,Job-3 将失败并创建新作业,依此类推,直到“作业目的”在某个时间点成功。
var Queue = require('bull');
let redisOptions = {
redis: { port: 6379, host: '127.0.0.1' }
}
var myQueue = new Queue('Linear-Queue', redisOptions);
myQueue.process('Type-1', function (job, done) {
console.log(`Processing Job-${job.id} Attempt: ${job.attemptsMade}`);
downloadFile(job, async function (error) {
if (error) {
await repeatSameJob(job, done);
} else {
done();
}
});
});
async function repeatSameJob(job, done) {
let newJob = await myQueue.add('Type-1', job.data, { ...{ priority: 1 }, ...job.opts });
console.log(`Job-${job.id} failed. Creating new Job-${newJob.id} with highest priority for same data.`);
done(true);
}
function downloadFile(job, done) {
setTimeout(async () => {
done(job.data.error)
}, job.data.time);
}
myQueue.on('completed', function (job, result) {
console.log("Completed: Job-" + job.id);
});
myQueue.on('failed', async function (job, error) {
console.log("Failed: Job-" + job.id);
});
let options = {
removeOnComplete: true, // removes job from queue on success
removeOnFail: true // removes job from queue on failure
}
for (let i = 1; i <= 5; i++) {
let error = false;
if (i == 3) { error = true; }
setTimeout(i => {
let jobData = {
time: i * 2000,
error: error,
description: `Job-${i}`
}
myQueue.add('Type-1', jobData, options);
}, i * 2000, i);
}
输出: