如何在节点 js 上只使用来自 rabbitmq 的一条消息

How to consume just one message from rabbit mq on nodejs

我正在使用 amqp.node 库将 rabbitmq 集成到我的系统中。

但是在消费者中,我想一次只处理一条消息,然后确认该消息,然后使用队列中的下一条消息。

当前代码为:

// Consumer
open.then(function(conn) {
  var ok = conn.createChannel();
  ok = ok.then(function(ch) {
    ch.assertQueue(q);
    ch.consume(q, function(msg) {
      if (msg !== null) {
        othermodule.processMessage(msg, function(error, response){
          console.log(msg.content.toString());
          ch.ack(msg);
        });
      }
    });
  });
  return ok;
}).then(null, console.warn);

ch.consume会一次性处理通道中的所有消息,调用这里的模块的函数其他模块不会在同一时间线执行。

我想等待 othermodule 函数完成,然后再使用队列中的下一条消息。

创建模型时需要为其设置QOS。下面是我们在 C# 中的做法:

    var _model = rabbitConnection.CreateModel();
    // Configure the Quality of service for the model. Below is how what each setting means.
    // BasicQos(0="Dont send me a new message untill I’ve finshed",  _fetchSize = "Send me N messages at a time", false ="Apply to this Model only")
    _model.BasicQos(0, _fetchSize, false);
    var consumerTag = _model.BasicConsume(rabbitQueue.QueueName, false, _consumerName, queueingConsumer);

您需要设置预取值,如本例所示:

https://github.com/squaremo/amqp.node/blob/master/examples/tutorials/rpc_server.js#L22

您必须设置 QoS = 1。

ch = ...
ch.qos(1);
ch.consume(q, msg => { ... });

(javascript)

目前(2018 年),我认为 RabbitMQ 团队可以选择这样做:

https://www.rabbitmq.com/tutorials/tutorial-two-javascript.html

ch.prefetch(1);

In order to defeat that we can use the prefetch method with the value of 1. This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don't dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.

跟进此处的示例:

https://www.npmjs.com/package/amqplib

// Consumer
function consumer(conn) {
  var ok = conn.createChannel(on_open);
  function on_open(err, ch) {
    if (err != null) bail(err);
    ch.assertQueue(q);
    
     // IMPORTANT
    ch.prefetch(1);

    ch.consume(q, function(msg) {
      if (msg !== null) {
        console.log(msg.content.toString());
        ch.ack(msg);
      }
    });
  }
}

参考文献:http://www.squaremobius.net/amqp.node/channel_api.html#channel_prefetch