AMQPlib nodejs消费者任务并发

AMQPlib nodejs consumer task concurrency

我正在使用 amqlib 模块使用 rabbitmq 和 nodejs 构建后台任务管理系统。

有些任务确实 CPU-消耗,所以如果我启动了很多任务而我只有几个工作人员,我的服务器可能会被杀死(使用太多 CPU).

我想知道是否有一种方法可以创建一个 amqp 队列,以便我的消费者一次只能使用该队列的一个任务(即,在确认或拒绝之前,不要发送此队列的任务善待这个消费者)。 或者我应该在代码中自己处理这个问题(也许在我的工作人员中保留一个参考,我正在处理这个队列的任务并在我执行任务时拒绝这个队列的所有任务?)。

这是我的示例代码:

我正在这样创建 amqp 连接

const amqpConn = require('amqplib').connect('amqp://localhost');

我的队列名称是 tasks :

amqpConn.then((conn) => {
  return conn.createChannel();
}).then((ch) => {
  return ch.assertQueue('tasks').then((ok) => {
    ch.sendToQueue(q, new Buffer(`something to do ${i}`));
  });
}).catch(console.warn);

这是我的消费者(我想这是我应该做的工作来限制这个队列的一个并发任务):

amqpConn.then((conn) => {
  return conn.createChannel();
}).then((ch) => {
  return ch.assertQueue('tasks').then((ok) => {
    return ch.consume('tasks', (msg) => {
      if (msg !== null) {
        console.log(msg.content.toString());
        ch.ack(msg);
      }
    });
  });
}).catch(console.warn);

非常感谢!

I'm wondering if there is a way to create an amqp queue so that my consumers will only consume one task of this queue at a time

如果这是您真正需要的,那么是的,只需要一个消费者并声明该队列是独占的。这样一次就消耗了一个任务。

我想我已经完成了:

  • 为每个队列创建一个频道
  • 使用通道的 prefetch_count 来限制每个消费者的并发度

https://www.rabbitmq.com/consumer-prefetch.html