无法使用 RabbitMQ 消息发布选项?

Can't publish options with RabbitMQ message?

我在节点代码中使用 ampq.node 访问 RabbitMQ。我正在尝试使用 publishsendToQueue 方法使用 options 参数在我发布的消息中包含一些元数据(即时间戳和内容类型)。

但是我传递给 options 的任何内容都被完全忽略了。我想我缺少一些格式或字段名称,但我找不到任何可靠的文档(除了 here 提供的文档之外,它似乎无法完成这项工作)。

下面是我的publish函数代码:

  var publish = function(queueName, message) {
    let content;
    let options = {
      persistent: true,
      noAck: false,
      timestamp: Date.now(),
      contentEncoding: 'utf-8'
    };
    if(typeof message === 'object') {
      content = new Buffer(JSON.stringify(message));
      options.contentType = 'application/json';
    }
    else if(typeof message === 'string') {
      content = new Buffer(message);
      options.contentType = 'text/plain';
    }
    else {  //message is already a buffer?
      content = message;
    }
    return Channel.sendToQueue(queueName, content, options); //Channel defined and opened elsewhere
  };

我错过了什么?

更新: 事实证明,如果您选择使用 ConfirmChannel,您 必须 提供回调函数作为最后一个参数,否则选项对象将被忽略。因此,一旦我将代码更改为以下内容,我就开始正确地看到选项:

Channel.sendToQueue(queueName, content, options, (err, result) => {...});

不知何故,我似乎无法让您的示例发布正常工作...尽管我没有发现它有什么特别的问题。我不确定为什么我无法让您的示例代码正常工作。

但我能够修改我自己的 amqplib 介绍代码的一个版本,并使其与您的选项一起工作得很好。

这是我的示例的完整代码:

// test.js file

var amqplib = require("amqplib");

var server = "amqp://test:password@localhost/test-app";

var connection, channel;

function reportError(err){
  console.log("Error happened!! OH NOES!!!!");
  console.log(err.stack);
  process.exit(1);
}

function createChannel(conn){
  console.log("creating channel");
  connection = conn;
  return connection.createChannel();
}

function sendMessage(ch){
  channel = ch;

  console.log("sending message");
  var msg = process.argv[2];
  var message = new Buffer(msg);

  var options = {
    persistent: true,
    noAck: false,
    timestamp: Date.now(),
    contentEncoding: "utf-8",
    contentType: "text/plain"
  };

  channel.sendToQueue("test.q", message, options);
  return channel.close();
}

console.log("connecting");
amqplib.connect(server)
  .then(createChannel)
  .then(sendMessage)
  .then(process.exit, reportError);

到运行这个,打开命令行并执行:

node test.js "example text message"

运行之后,您会在 "test-app" 虚拟主机的 "test.q" 队列中看到消息(假设您已创建该队列)。

这是 RMQ 管理插件生成的消息的屏幕截图:


旁注:

我建议不要使用 sendToQueue。正如我在 RabbitMQ Patterns email course / ebook 中所说:

It took a while for me to realize this, but I now see the "send to queue" feature of RabbitMQ as an anti-pattern.

Sure, it's built in to the library and protocol. And it's convenient, right? But that doesn't mean you should use it. It's one of those features that exists to make demos simple and to handle some specific scenarios. But generally speaking, "send to queue" is an anti-pattern.

When you're a message producer, you only care about sending the message to the right exchange with the right routing key. When you're a message consumer, you care about the message destination - the queue to which you are subscribed. A message may be sent to the same exchange, with the same routing key, every day, thousands of times per day. But, that doesn't mean it will arrive in the same queue every time.

As message consumers come online and go offline, they can create new queues and bindings and remove old queues and bindings. This perspective of message producers and consumers informs the nature of queues: postal boxes that can change when they need to.

我也建议不要直接使用 amqplib。这是一个很棒的库,但缺乏很多可用性。相反,在 amqplib 之上寻找一个好的库。

我更喜欢wascally, by LeanKit。它是在 amqplib 之上的更简单的抽象,并提供了许多很棒的特性和功能。

最后,如果您在设置 RMQ 和 运行 Node.js、设计您的应用程序等方面遇到其他细节问题,请查看我的 RabbitMQ For Devs当然 - 从零到英雄,速度很快。 :)

这可能对其他人有帮助,但用于内容类型的键名称是 javascript 代码中的 contentType。使用 rabbitMQ 的 web Gui,他们使用 content_type 作为键名。不同的键名来声明选项,因此请确保在正确的上下文中使用正确的键名。