我怎样才能确保在 Bull 中作业不会 运行 两次?
How can I make sure that a job doesn't run twice in Bull?
我有两个函数,scheduleScan()
和 scan()
。
scan()
调用 scheduleScan()
时除了安排新的扫描 别无他法,所以 scheduleScan()
可以安排 scan()
.但是有一个问题,有些作业 运行 两次。
我想确保在任何给定时间只处理一项作业。我怎样才能做到这一点?我相信它与 done()
有关(它在 scan() 中,现在已删除)但我想不出解决方案。
公牛版本:3.12.1
重要的后期编辑: scan()
调用另一个函数,它们可能调用也可能不调用其他函数,但它们都是同步函数,所以它们只调用一个当他们自己的工作完成后,只有一条路可以前进。在"tree"结束时,我调用它,最后一个函数调用scheduleScan(),但不能同时存在两个作业运行ning。顺便说一下,每个作业都从 scan()
开始,并以 scheduleScan(stock, period, milliseconds, 'called by file.js')
结束
export function update(job) {
// does some calculations, then it may call scheduleScan() or
// it may call another function, and that could be the one calling
// scheduleScan() function.
// For instance, a function like finalize()
}
export function scan(job) {
update(job)
}
import moment from 'moment'
import stringHash from 'string-hash'
const opts = { redis: { port: 6379, host: '127.0.0.1', password: mypassword' } }
let queue = new Queue('scan', opts)
queue.process(1, (job) => {
job.progress(100).then(() => {
scan(job)
})
})
export function scheduleScan (stock, period, milliseconds, triggeredBy) {
let uniqueId = stringHash(stock + ':' + period)
queue.getJob(uniqueId).then(job => {
if (!job) {
if (milliseconds) {
queue.add({ stock, period, triggeredBy }, { delay: milliseconds, jobId: uniqueId }).then(() => {
// console.log('Added with ms: ' + stock + ' ' + period)
}).catch(err => {
if (err) {
console.log('Can not add because it exists ' + new Date())
}
})
} else {
queue.add({ stock, period, triggeredBy }, { jobId: uniqueId }).then(() => {
// console.log('Added without ms: ' + stock + ' ' + period)
}).catch(err => {
if (err) {
console.log('Can not add because it exists ' + new Date())
}
})
}
} else {
job.getState().then(state => {
if (state === 'completed') {
job.remove().then(() => {
if (milliseconds) {
queue.add({ stock, period, triggeredBy }, { delay: milliseconds, jobId: uniqueId }).then(() => {
// console.log('Added with ms: ' + stock + ' ' + period)
}).catch(err => {
if (err) {
console.log('Can not add because it exists ' + new Date())
}
})
} else {
queue.add({ stock, period, triggeredBy }, { jobId: uniqueId }).then(() => {
// console.log('Added without ms: ' + stock + ' ' + period)
}).catch(err => {
if (err) {
console.log('Can not add because it exists ' + new Date())
}
})
}
}).catch(err => {
if (err) {
// console.log(err)
}
})
}
}).catch(err => {
// console.log(err)
})
}
})
}
问题,我认为是您的 scan
函数是异步的。所以你的 job.progress
函数调用 scan
然后立即调用 done
允许队列处理另一个作业。
一个解决方案是将 done
回调作为参数传递给 scan
和 scheduleScan
函数,并在完成工作(或出错时)调用它).
另一个(更好的)解决方案是确保您总是 return 来自 scan
和 scheduleScan
的 Promise
,然后等待承诺解决,然后调用done
。如果这样做,请确保将所有承诺 return 链接到 scheduleScan
函数中。
queue.process(1, (job, done) => {
job.progress(100).then(() => {
scan(job)
.then(done)
.catch(done)
})
})
export function scan() {
// business logic
return scheduleScan()
}
// Chain all of your promise returns. Otherwise
// the scan function will return sooner and allow done to be called
// prior to the scheduleScan function finishing it's execution
export function scheduleScan() {
return queue.getJob(..).then(() => {
....
return queue.add()...
....
return queue.add(...)
.catch(e => {
console.log(e);
// propogate errors!
throw e;
})
}
扫描函数是一个异步函数。在 queue.process()
函数中,您必须等待扫描函数,然后调用 done()
回调。
export async function scan(job) {
// it does some calculations, then it creates a new schedule.
return scheduleScan(stock, period, milliseconds, "scan.js");
}
queue.process(1, (job, done) => {
job.progress(100).then(async() => {
await scan(job);
done();
});
});
export async function scheduleScan(stock, period, milliseconds, triggeredBy) {
let uniqueId = stringHash(stock + ":" + period);
try {
const existingJob = await queue.getJob(uniqueId);
if (!existingJob) {
const job = await addJob({
queue,
stock,
period,
uniqueId,
milliseconds,
triggeredBy
});
return job;
} else {
const jobState = await existingJob.getState();
if (jobState === "completed") {
await existingJob.remove();
const newJob = await addJob({
queue,
stock,
period,
uniqueId,
milliseconds,
triggeredBy
});
return newJob;
}
}
} catch (err) {
throw new Error(err);
}
}
export function addJob({ queue, stock, period, milliseconds, triggeredBy }) {
if (milliseconds) {
return queue.add(
{ stock, period, triggeredBy },
{ delay: milliseconds, jobId: uniqueId }
);
} else {
return queue.add({ stock, period, triggeredBy }, { jobId: uniqueId });
}
}
试试这个!我尝试使用 async-await 稍微重构一下代码。
我有两个函数,scheduleScan()
和 scan()
。
scan()
调用 scheduleScan()
时除了安排新的扫描 别无他法,所以 scheduleScan()
可以安排 scan()
.但是有一个问题,有些作业 运行 两次。
我想确保在任何给定时间只处理一项作业。我怎样才能做到这一点?我相信它与 done()
有关(它在 scan() 中,现在已删除)但我想不出解决方案。
公牛版本:3.12.1
重要的后期编辑: scan()
调用另一个函数,它们可能调用也可能不调用其他函数,但它们都是同步函数,所以它们只调用一个当他们自己的工作完成后,只有一条路可以前进。在"tree"结束时,我调用它,最后一个函数调用scheduleScan(),但不能同时存在两个作业运行ning。顺便说一下,每个作业都从 scan()
开始,并以 scheduleScan(stock, period, milliseconds, 'called by file.js')
export function update(job) {
// does some calculations, then it may call scheduleScan() or
// it may call another function, and that could be the one calling
// scheduleScan() function.
// For instance, a function like finalize()
}
export function scan(job) {
update(job)
}
import moment from 'moment'
import stringHash from 'string-hash'
const opts = { redis: { port: 6379, host: '127.0.0.1', password: mypassword' } }
let queue = new Queue('scan', opts)
queue.process(1, (job) => {
job.progress(100).then(() => {
scan(job)
})
})
export function scheduleScan (stock, period, milliseconds, triggeredBy) {
let uniqueId = stringHash(stock + ':' + period)
queue.getJob(uniqueId).then(job => {
if (!job) {
if (milliseconds) {
queue.add({ stock, period, triggeredBy }, { delay: milliseconds, jobId: uniqueId }).then(() => {
// console.log('Added with ms: ' + stock + ' ' + period)
}).catch(err => {
if (err) {
console.log('Can not add because it exists ' + new Date())
}
})
} else {
queue.add({ stock, period, triggeredBy }, { jobId: uniqueId }).then(() => {
// console.log('Added without ms: ' + stock + ' ' + period)
}).catch(err => {
if (err) {
console.log('Can not add because it exists ' + new Date())
}
})
}
} else {
job.getState().then(state => {
if (state === 'completed') {
job.remove().then(() => {
if (milliseconds) {
queue.add({ stock, period, triggeredBy }, { delay: milliseconds, jobId: uniqueId }).then(() => {
// console.log('Added with ms: ' + stock + ' ' + period)
}).catch(err => {
if (err) {
console.log('Can not add because it exists ' + new Date())
}
})
} else {
queue.add({ stock, period, triggeredBy }, { jobId: uniqueId }).then(() => {
// console.log('Added without ms: ' + stock + ' ' + period)
}).catch(err => {
if (err) {
console.log('Can not add because it exists ' + new Date())
}
})
}
}).catch(err => {
if (err) {
// console.log(err)
}
})
}
}).catch(err => {
// console.log(err)
})
}
})
}
问题,我认为是您的 scan
函数是异步的。所以你的 job.progress
函数调用 scan
然后立即调用 done
允许队列处理另一个作业。
一个解决方案是将 done
回调作为参数传递给 scan
和 scheduleScan
函数,并在完成工作(或出错时)调用它).
另一个(更好的)解决方案是确保您总是 return 来自 scan
和 scheduleScan
的 Promise
,然后等待承诺解决,然后调用done
。如果这样做,请确保将所有承诺 return 链接到 scheduleScan
函数中。
queue.process(1, (job, done) => {
job.progress(100).then(() => {
scan(job)
.then(done)
.catch(done)
})
})
export function scan() {
// business logic
return scheduleScan()
}
// Chain all of your promise returns. Otherwise
// the scan function will return sooner and allow done to be called
// prior to the scheduleScan function finishing it's execution
export function scheduleScan() {
return queue.getJob(..).then(() => {
....
return queue.add()...
....
return queue.add(...)
.catch(e => {
console.log(e);
// propogate errors!
throw e;
})
}
扫描函数是一个异步函数。在 queue.process()
函数中,您必须等待扫描函数,然后调用 done()
回调。
export async function scan(job) {
// it does some calculations, then it creates a new schedule.
return scheduleScan(stock, period, milliseconds, "scan.js");
}
queue.process(1, (job, done) => {
job.progress(100).then(async() => {
await scan(job);
done();
});
});
export async function scheduleScan(stock, period, milliseconds, triggeredBy) {
let uniqueId = stringHash(stock + ":" + period);
try {
const existingJob = await queue.getJob(uniqueId);
if (!existingJob) {
const job = await addJob({
queue,
stock,
period,
uniqueId,
milliseconds,
triggeredBy
});
return job;
} else {
const jobState = await existingJob.getState();
if (jobState === "completed") {
await existingJob.remove();
const newJob = await addJob({
queue,
stock,
period,
uniqueId,
milliseconds,
triggeredBy
});
return newJob;
}
}
} catch (err) {
throw new Error(err);
}
}
export function addJob({ queue, stock, period, milliseconds, triggeredBy }) {
if (milliseconds) {
return queue.add(
{ stock, period, triggeredBy },
{ delay: milliseconds, jobId: uniqueId }
);
} else {
return queue.add({ stock, period, triggeredBy }, { jobId: uniqueId });
}
}
试试这个!我尝试使用 async-await 稍微重构一下代码。