如何在 Nestjs bull 中一次执行预定义数量的作业

How to execute predefined number of jobs at one time in Nestjs bull

我有 65 个相同的命名作业要在一个队列中执行,其中有 3 个消费者实例(A、B、C)。我想一次让每个消费者执行 10 个作业。 10个执行完成后,如果队列中有大于10个的可用作业,消费者再次执行10个作业。如果不执行可用作业。

职位 1 到 65

消费者A执行1到10 消费者 B 执行 11 到 20 消费者C执行21到30

让B、A、C依次执行完毕。 然后

B - 31,32,33,.40 A - 41,42,43,.50 C - 51,52,53,.60

如果C先执行完,则C执行剩下的5个作业。 请问有什么方法可以实现吗

制作人

@Injectable()
export class SampleQueueProducerService {
  constructor(@InjectQueue('sample-queue') private sampleQueue: Queue) {}

  async sendDataToJob(message: string) {
    await this.sampleQueue.add('job', { message });
  }
}

消费者

@Processor('sample-queue')
export class SampleQueueConsumerService {
  @Process({ name: 'job' })
  async sampleJob(job: Job<any>) {
    console.log(job.data);
  }
}

3个消费者同上

您可以使用此功能获得 x 个职位(bull reference

但是 NestJS 并不完全支持它

你可以做的是使用@OnGlobalQueueWaiting()@OnQueueWaiting()获取等待的作业,将它们存储在一个数组中,一旦达到10,你就可以发送它们进行处理。

类似于:

@OnGlobalQueueWaiting()
async onGlobalWaiting(jobId: number, result: any) {
  const job = await this.myQueue.getJob(jobId);
  this.jobArray.push(job)
  if (this.jobArray.length >= 10) {
       await this.processJobs(this.jobArray);
       this.jobArray = [];
   }
}

async processJobs(jobs: Job[]){'
   jobs.forEach(job => do something)
}