loopback 4中如何实现rabbit mq或者bull js

How to implement rabbit mq or bull js in loopback 4

我正在尝试实施后台服务,以减轻 API 调用的负载。后台任务将 运行 将文件上传到 S3 并使用 nodemailer 发送电子邮件。

按照 loopback 4 的设计模式,您可以创建一个服务提供者并将其注入您的服务或控制器。 这是一个非常基本的例子 Bull.js:

import Bull, { Queue, Job } from "bull";
import {Provider, service} from "@loopback/core";
import {get} from "@loopback/rest";

export function audioProcessor(job: Job) {
  console.log(
    `Processing audio file: ${job.data.filename}`,
    `Audio bitrate: ${job.data.bitrate}`
  );
}

export class AudioQueueProvider implements Provider<Queue> {
  async value() {
    const queue = new Bull("AudioQueue", {
      redis: { port: 6379, host: "127.0.0.1" }
    });

    queue.process(audioProcessor);
    
    return queue;
  }
}

export class AudioController {
  constructor(
    @service(AudioQueueProvider) public queue: Queue;
  ) {}

  @get('/process-audio')
  async addToQueue(): Promise<string> {
    await this.queue.add(
      {
        filename: 'process_me.wav',
        bitrate: 320,
      }
    );
    return 'Audio file added to the AudioQueue for processing';
  }
}

RabbitMQ 实现应该类似(未测试):

import { Provider, service } from "@loopback/core";
import {get} from "@loopback/rest";
import amqp from "amqplib/callback_api";

export function audioProcessor(msg: any) {
  console.log(
    `Processing audio file: ${msg.content.filename}`,
    `Audio bitrate: ${msg.content.bitrate}`
  );
}

export class AudioQueueProvider implements Provider<any> {
  async value() {
    const CONN_URL = "amqp://localhost";
    let ch = null;
    const channelName = 'AudioQueue';
    
    amqp.connect(CONN_URL, function (err, conn) {
      conn.createChannel(function (err, channel) {
        ch = channel;
      });
    });

    ch.assertQueue(channelName, {
      durable: false
    });

    ch.consume(channelName, audioProcessor, {
      noAck: true
    });
    
    return ch;
  }
}

export class AudioController {
  constructor(
    @service(AudioQueueProvider) public channel: any;
  ) {}

  @get('/process-audio')
  async addToQueue(): Promise<string> {
    await this.channel.sendToQueue(
      'AudioQueue',
      {
        filename: 'process_me.wav',
        bitrate: 320,
      }
    );
    return 'Audio file added to the AudioQueue for processing';
  }
}