使用 bull 的作业处理微服务

Job processing microservices using bull

我想使用 node.js bull 处理预定作业。基本上我有两个处理器来处理两种类型的工作。有一个配置器配置将使用 cron 添加到 bull 队列的作业。

调度程序将在一个微服务中,每个处理器将是一个单独的微服务。所以我将拥有 3 个微服务。

我的问题是我使用的 bull 模式是否正确?

index.js

const Queue = require('bull');

const fetchQueue = new Queue('MyScheduler');
fetchQueue.add("fetcher", {name: "earthQuakeAlert"}, {repeat: {cron: '1-59/2 * * * *'}, removeOnComplete: true});
fetchQueue.add("fetcher", {name: "weatherAlert"}, {repeat: {cron: '3-59/3 * * * *'}, removeOnComplete: true});

处理器-configurator.js

const Queue=require('bull');

const scheduler = new Queue("MyScheduler");
scheduler.process("processor", __dirname + "/alert-processor");

fetcher-configurator.js

const Queue=require('bull');

const scheduler = new Queue("MyScheduler");
scheduler.process("fetcher", __dirname+"/fetcher");

fetcher.js

const Queue = require('bull');
const moment = require('moment');

module.exports = function (job) {
    const scheduler = new Queue('MyScheduler');
    console.log("Insider processor ", job.data, moment().format("YYYY-MM-DD hh:mm:ss"));
    scheduler.add('processor', {'name': 'Email needs to be sent'}, {removeOnComplete: true});
    return Promise.resolve()
};

警报-processor.js

const Queue = require('bull');
const moment = require('moment');

module.exports = function (job) {
    const scheduler = new Queue('MyScheduler');
    console.log("Insider processor ", job.data, moment().format("YYYY-MM-DD hh:mm:ss"));
    scheduler.add('processor', {'name': 'Email needs to be sent'}, {removeOnComplete: true});
    return Promise.resolve()
};

会有3个微服务-

  1. 节点index.js
  2. 节点获取器-configurator.js
  3. 节点处理器-configurator.js

我看到公牛的行为不一致。有时我收到错误 缺少作业类型的进程处理程序

引用自己的话希望这对其他人有帮助

This is because both workers use the same queue. Worker tries to get next job from queue, receives a job with wrong type (eg "fetcher" instead of "processor") and fails because it knows how to handle "processor" and doesn't know what to do with "fetcher". Bull doesn't allow you to take only compatible jobs from queue, both workers should be able to process all types of jobs. The simplest solution would be to use two different queues, one for processors and one for fetchers. Then you can remove names from jobs and processors, it won't be needed anymore since name is defined by the queue.

https://github.com/OptimalBits/bull/issues/1481

公牛:

到期-queue.js

import Queue from 'bull';
import { ExpirationCompletePublisher } from '../events/publishers/expiration-complete-publisher';
import { natsWrapper } from '../nats-wrapper';
interface Payload {
  orderId: string;
}

const expirationQueue = new Queue<Payload>('order:expiration', {
  redis: {
    host: process.env.REDIS_HOST, 
  },
});

expirationQueue.process(async (job) => {
  console.log('Expiries order id', job.data.orderId);
  new ExpirationCompletePublisher(natsWrapper.client).publish({
    orderId: job.data.orderId,
  });
});

export { expirationQueue };

promotionEndQueue.js

import Queue from 'bull';
import { PromotionEndedPublisher } from '../events/publishers/promotion-ended-publisher';
import { natsWrapper } from '../nats-wrapper';
interface Payload {
  promotionId: string;
}

const promotionEndQueue = new Queue<Payload>('promotions:end', {
  redis: {
    host: process.env.REDIS_HOST, // look at expiration-depl.yaml
  },
});

promotionEndQueue.process(async (job) => {
  console.log('Expiries promotion id', job.data.promotionId);
  new PromotionEndedPublisher(natsWrapper.client).publish({
    promotionId: job.data.promotionId,
  });
});

export { promotionEndQueue };

已创建订单-listener.js

import { Listener, OrderCreatedEvent, Subjects } from '@your-lib/common';
import { queueGroupName } from './queue-group-name';
import { Message } from 'node-nats-streaming';
import { expirationQueue } from '../../queues/expiration-queue';
export class OrderCreatedListener extends Listener<OrderCreatedEvent> {
  subject: Subjects.OrderCreated = Subjects.OrderCreated;
  queueGroupName = queueGroupName;

  async onMessage(data: OrderCreatedEvent['data'], msg: Message) {
    // delay = expiredTime - currentTime
    const delay = new Date(data.expiresAt).getTime() - new Date().getTime();
    // console.log("delay", delay)
    await expirationQueue.add(
      {
        orderId: data.id,
      },
      {
        delay,
      }
    );

    msg.ack();
  }
}

促销开始-listener.js

import {
  Listener,
  PromotionStartedEvent,
  Subjects,
} from '@your-lib/common';
import { queueGroupName } from './queue-group-name';
import { Message } from 'node-nats-streaming';
import { promotionEndQueue } from '../../queues/promotions-end-queue';
export class PromotionStartedListener extends Listener<PromotionStartedEvent> {
  subject: Subjects.PromotionStarted = Subjects.PromotionStarted;
  queueGroupName = queueGroupName;

  async onMessage(data: PromotionStartedEvent['data'], msg: Message) {
    // delay = expiredTime - currentTime
    const delay = new Date(data.endTime).getTime() - new Date().getTime();

    // console.log("delay", delay)
    await promotionEndQueue.add(
      {
        promotionId: data.id,
      },
      {
        delay,
      }
    );

    msg.ack();
  }
}