Bull Queue 未完成
Bull Queue is not getting completed
我是 Bull.I 的新手,根据他们的文档代码尝试过 运行 bull。流程正在启动,但我的工作没有完成,或者我不确定它是否触发了完成事件?我不确定我哪里弄错了
在下面附上我的代码
const Queue = require('bull');
const myFirstQueue = new Queue('my-first-queue',
{
redis: {
port: Config.redis.port,
host: Config.redis.host,
password: Config.redis.password
},
});
(async function ad() {
const job = await myFirstQueue.add({
foo: 'bar',
});
})();
myFirstQueue.process(async (job, data) => {
log.debug({ job, data }, 'Job data');
let progress = 0;
for (let i = 0; i < 10; i++) {
await doSomething(data);
progress += 10;
job.progress(progress).catch(err => {
log.debug({ err }, 'Job progress err');
});
log.debug({ progress }, 'After await');
}
return job;
});
const doSomething = data => {
return new Promise((resolve, reject) => {
return resolve(data);
});
};
myFirstQueue.on('completed', (job, result) => {
log.debug(`Job completed with result ${job}`);
});
myFirstQueue.on('progress', (job, progress) => {
log.debug(`Job progress with result ${job} ${progress}`);
});
我可以看到进度事件处理程序中的日志,但未触发完成事件。感谢任何帮助
您需要从流程中调用 done(),然后只有完成的事件才会触发。
myFirstQueue.process(async (job, done) => {
const data = job.data;
let progress = 0;
for (let i = 0; i < 10; i++) {
await doSomething(data);
progress += 10;
job.progress(progress).catch(err => {
log.debug({ err }, 'Job progress err');
});
}
done();
});
您可以从作业对象本身获取数据,无需向外传递数据,进程回调需要作业和data()作为参数。最后调用 data() 回调完成一个作业。如果某些验证失败,您可以传递数据和错误。更好的解释 https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#queueprocess
// call done when finished
done();
// or give a error if error
done(new Error('error transcoding'));
// or pass it a result
done(null, { framerate: 29.5 /* etc... */ });
不想调用done()回调可以直接使用promise,ref-https://github.com/OptimalBits/bull#using-promises
对您的代码进行了小幅修改,希望对您有所帮助,
const Queue = require('bull');
const myFirstQueue = new Queue('my-first-queue');
(async function ad() {
const job = await myFirstQueue.add({
foo: 'bar',
});
})();
myFirstQueue.process(async (job, done) => {
log.debug( 'Job data ' + job.data);
let progress = 0;
for (let i = 0; i < 10; i++) {
await doSomething(job.data);
progress += 10;
job.progress(progress).catch(err => {
log.debug({ err }, 'Job progress err');
});
log.debug({ progress }, 'After await');
}
done(null, {done: job.data}); //we need this if we are not using promise
});
const doSomething = data => {
return new Promise((resolve, reject) => {
return resolve(data);
});
};
myFirstQueue.on('completed', (job, result) => {
log.debug(`Job completed with result ${job}`);
});
myFirstQueue.on('progress', (job, progress) => {
log.debug(`Job progress with result ${job} ${progress}`);
});
我是 Bull.I 的新手,根据他们的文档代码尝试过 运行 bull。流程正在启动,但我的工作没有完成,或者我不确定它是否触发了完成事件?我不确定我哪里弄错了
在下面附上我的代码
const Queue = require('bull');
const myFirstQueue = new Queue('my-first-queue',
{
redis: {
port: Config.redis.port,
host: Config.redis.host,
password: Config.redis.password
},
});
(async function ad() {
const job = await myFirstQueue.add({
foo: 'bar',
});
})();
myFirstQueue.process(async (job, data) => {
log.debug({ job, data }, 'Job data');
let progress = 0;
for (let i = 0; i < 10; i++) {
await doSomething(data);
progress += 10;
job.progress(progress).catch(err => {
log.debug({ err }, 'Job progress err');
});
log.debug({ progress }, 'After await');
}
return job;
});
const doSomething = data => {
return new Promise((resolve, reject) => {
return resolve(data);
});
};
myFirstQueue.on('completed', (job, result) => {
log.debug(`Job completed with result ${job}`);
});
myFirstQueue.on('progress', (job, progress) => {
log.debug(`Job progress with result ${job} ${progress}`);
});
我可以看到进度事件处理程序中的日志,但未触发完成事件。感谢任何帮助
您需要从流程中调用 done(),然后只有完成的事件才会触发。
myFirstQueue.process(async (job, done) => {
const data = job.data;
let progress = 0;
for (let i = 0; i < 10; i++) {
await doSomething(data);
progress += 10;
job.progress(progress).catch(err => {
log.debug({ err }, 'Job progress err');
});
}
done();
});
您可以从作业对象本身获取数据,无需向外传递数据,进程回调需要作业和data()作为参数。最后调用 data() 回调完成一个作业。如果某些验证失败,您可以传递数据和错误。更好的解释 https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#queueprocess
// call done when finished
done();
// or give a error if error
done(new Error('error transcoding'));
// or pass it a result
done(null, { framerate: 29.5 /* etc... */ });
不想调用done()回调可以直接使用promise,ref-https://github.com/OptimalBits/bull#using-promises
对您的代码进行了小幅修改,希望对您有所帮助,
const Queue = require('bull');
const myFirstQueue = new Queue('my-first-queue');
(async function ad() {
const job = await myFirstQueue.add({
foo: 'bar',
});
})();
myFirstQueue.process(async (job, done) => {
log.debug( 'Job data ' + job.data);
let progress = 0;
for (let i = 0; i < 10; i++) {
await doSomething(job.data);
progress += 10;
job.progress(progress).catch(err => {
log.debug({ err }, 'Job progress err');
});
log.debug({ progress }, 'After await');
}
done(null, {done: job.data}); //we need this if we are not using promise
});
const doSomething = data => {
return new Promise((resolve, reject) => {
return resolve(data);
});
};
myFirstQueue.on('completed', (job, result) => {
log.debug(`Job completed with result ${job}`);
});
myFirstQueue.on('progress', (job, progress) => {
log.debug(`Job progress with result ${job} ${progress}`);
});