如何在 Nestjs bull 中一次执行预定义数量的作业
How to execute predefined number of jobs at one time in Nestjs bull
我有 65 个相同的命名作业要在一个队列中执行,其中有 3 个消费者实例(A、B、C)。我想一次让每个消费者执行 10 个作业。 10个执行完成后,如果队列中有大于10个的可用作业,消费者再次执行10个作业。如果不执行可用作业。
职位 1 到 65
消费者A执行1到10
消费者 B 执行 11 到 20
消费者C执行21到30
让B、A、C依次执行完毕。
然后
B - 31,32,33,.40
A - 41,42,43,.50
C - 51,52,53,.60
如果C先执行完,则C执行剩下的5个作业。
请问有什么方法可以实现吗
制作人
@Injectable()
export class SampleQueueProducerService {
constructor(@InjectQueue('sample-queue') private sampleQueue: Queue) {}
async sendDataToJob(message: string) {
await this.sampleQueue.add('job', { message });
}
}
消费者
@Processor('sample-queue')
export class SampleQueueConsumerService {
@Process({ name: 'job' })
async sampleJob(job: Job<any>) {
console.log(job.data);
}
}
3个消费者同上
您可以使用此功能获得 x 个职位(bull reference)
但是 NestJS 并不完全支持它
你可以做的是使用@OnGlobalQueueWaiting()
或@OnQueueWaiting()
获取等待的作业,将它们存储在一个数组中,一旦达到10,你就可以发送它们进行处理。
类似于:
@OnGlobalQueueWaiting()
async onGlobalWaiting(jobId: number, result: any) {
const job = await this.myQueue.getJob(jobId);
this.jobArray.push(job)
if (this.jobArray.length >= 10) {
await this.processJobs(this.jobArray);
this.jobArray = [];
}
}
async processJobs(jobs: Job[]){'
jobs.forEach(job => do something)
}
我有 65 个相同的命名作业要在一个队列中执行,其中有 3 个消费者实例(A、B、C)。我想一次让每个消费者执行 10 个作业。 10个执行完成后,如果队列中有大于10个的可用作业,消费者再次执行10个作业。如果不执行可用作业。
职位 1 到 65
消费者A执行1到10 消费者 B 执行 11 到 20 消费者C执行21到30
让B、A、C依次执行完毕。 然后
B - 31,32,33,.40 A - 41,42,43,.50 C - 51,52,53,.60
如果C先执行完,则C执行剩下的5个作业。 请问有什么方法可以实现吗
制作人
@Injectable()
export class SampleQueueProducerService {
constructor(@InjectQueue('sample-queue') private sampleQueue: Queue) {}
async sendDataToJob(message: string) {
await this.sampleQueue.add('job', { message });
}
}
消费者
@Processor('sample-queue')
export class SampleQueueConsumerService {
@Process({ name: 'job' })
async sampleJob(job: Job<any>) {
console.log(job.data);
}
}
3个消费者同上
您可以使用此功能获得 x 个职位(bull reference)
但是 NestJS 并不完全支持它
你可以做的是使用@OnGlobalQueueWaiting()
或@OnQueueWaiting()
获取等待的作业,将它们存储在一个数组中,一旦达到10,你就可以发送它们进行处理。
类似于:
@OnGlobalQueueWaiting()
async onGlobalWaiting(jobId: number, result: any) {
const job = await this.myQueue.getJob(jobId);
this.jobArray.push(job)
if (this.jobArray.length >= 10) {
await this.processJobs(this.jobArray);
this.jobArray = [];
}
}
async processJobs(jobs: Job[]){'
jobs.forEach(job => do something)
}