带有 amqplib 的 Rabbitmq inNode.js 不广播扇出消息
Rabbitmq inNode.js with amqplib not broadcasting fanout messages
这个脚本应该是rabbitmq研究意义的使者。我正在尝试 运行 具有扇出交换的消息代理。
问题是:没有广播消息。这些以负载平衡方式发送。我怎样才能像真正的扇出交换一样广播这些消息?
const amqplib = require('amqplib');
const readline = require("readline");
class Chat {
async init() {
await this.configureChannel();
await this.configureConsumer();
await this.configureCommandLine();
}
async configureChannel() {
const conn = await amqplib.connect('amqp://guest:guest@localhost:5672');
const ch = await conn.createChannel();
await ch.assertExchange("chat", "fanout", {});
const { queue } = await ch.assertQueue('messages');
await ch.bindQueue(queue, 'chat', '');
this.ch = ch;
}
async configureConsumer() {
await this.ch.consume("messages", logMessage);
function logMessage(msg) {
if (msg.content)
console.log("\n[*] Recieved message: '%s'", msg.content.toString())
}
}
async configureCommandLine() {
const commandLine = readline.createInterface({
input: process.stdin,
output: process.stdout
});
this.commandLine = commandLine;
}
async run() {
const prompt = () => {
this.commandLine.question("Message: ", async (mensagem) => {
debugger;
if (mensagem === "sair") {
return this.commandLine.close();
}
await this.ch.publish("chat", 'messages', Buffer.from(mensagem), {});
prompt();
});
}
await this.init();
console.log("\nChat\n");
prompt();
}
}
const chat = new Chat();
chat.run();
假设您 运行 按原样使用此脚本的多个实例,这里的问题是您正在为所有消费者使用一个队列。 fanout
exchange 将它收到的每条消息发送到绑定到 exchange 的所有队列。但是每个队列(如果它有多个消费者)将以循环方式工作(假设预取计数有一些限制)。要实现 fanout
行为,您需要 运行 使用不同队列的每个脚本实例。
类似的东西(未测试):
...
async configureChannel() {
const conn = await amqplib.connect('amqp://guest:guest@localhost:5672');
const ch = await conn.createChannel();
await ch.assertExchange("chat", "fanout", {});
this.ch = ch;
}
async configureConsumer() {
const { ch } = this;
const { queue } = await ch.assertQueue('', { exclusive: true });
await ch.bindQueue(queue, 'chat', '');
ch.consume(queue, logMessage);
function logMessage(msg) {
if (msg.content)
console.log("\n[*] Recieved message: '%s'", msg.content.toString())
}
}
...
这个脚本应该是rabbitmq研究意义的使者。我正在尝试 运行 具有扇出交换的消息代理。
问题是:没有广播消息。这些以负载平衡方式发送。我怎样才能像真正的扇出交换一样广播这些消息?
const amqplib = require('amqplib');
const readline = require("readline");
class Chat {
async init() {
await this.configureChannel();
await this.configureConsumer();
await this.configureCommandLine();
}
async configureChannel() {
const conn = await amqplib.connect('amqp://guest:guest@localhost:5672');
const ch = await conn.createChannel();
await ch.assertExchange("chat", "fanout", {});
const { queue } = await ch.assertQueue('messages');
await ch.bindQueue(queue, 'chat', '');
this.ch = ch;
}
async configureConsumer() {
await this.ch.consume("messages", logMessage);
function logMessage(msg) {
if (msg.content)
console.log("\n[*] Recieved message: '%s'", msg.content.toString())
}
}
async configureCommandLine() {
const commandLine = readline.createInterface({
input: process.stdin,
output: process.stdout
});
this.commandLine = commandLine;
}
async run() {
const prompt = () => {
this.commandLine.question("Message: ", async (mensagem) => {
debugger;
if (mensagem === "sair") {
return this.commandLine.close();
}
await this.ch.publish("chat", 'messages', Buffer.from(mensagem), {});
prompt();
});
}
await this.init();
console.log("\nChat\n");
prompt();
}
}
const chat = new Chat();
chat.run();
假设您 运行 按原样使用此脚本的多个实例,这里的问题是您正在为所有消费者使用一个队列。 fanout
exchange 将它收到的每条消息发送到绑定到 exchange 的所有队列。但是每个队列(如果它有多个消费者)将以循环方式工作(假设预取计数有一些限制)。要实现 fanout
行为,您需要 运行 使用不同队列的每个脚本实例。
类似的东西(未测试):
...
async configureChannel() {
const conn = await amqplib.connect('amqp://guest:guest@localhost:5672');
const ch = await conn.createChannel();
await ch.assertExchange("chat", "fanout", {});
this.ch = ch;
}
async configureConsumer() {
const { ch } = this;
const { queue } = await ch.assertQueue('', { exclusive: true });
await ch.bindQueue(queue, 'chat', '');
ch.consume(queue, logMessage);
function logMessage(msg) {
if (msg.content)
console.log("\n[*] Recieved message: '%s'", msg.content.toString())
}
}
...