amqplib sendToQueue() 没有向 RabbitMQ 发送消息
amqplib sendToQueue() is not sending messages to RabbitMQ
我正在围绕 amqplib
的 Promise API 构建一个 Typescript 包,使我可以更简单地在两个 RabbitMQ 队列之间发送消息。
这是一个 class 包内的方法,负责向 RabbitMQ 发送消息。它实际上并没有发送消息,因为我无法从 CloudAMQP 看到它们。
class PostOffice {
connection: any;
constructor(connection: any) {
this.connection = connection;
}
//...
// TMQMessage is an object that can be trivially serialized to JSON
// The Writer class holds the channel because I'm having trouble
// making Typescript class member updates stick after a method returns.
async send(writer: Writer, message: TMQMessage) {
//...RabbitMQ code begins here
let channel = writer.stageConsume[0]; // This channel is created elsewhere
// Queue is created here and I can see it in CloudAMQP.
await channel.assertQueue(message.to + "/stage/1", {
durable: true,
}); // `message.to` is a string indicating destination, e.g. 'test/hello'
// According to docs, the content should be converted to Buffer
await channel.sendToQueue(message.to + "/stage/1", Buffer.from(JSON.stringify(message)));
channel.close()
}
}
这是我制作的 class 创建的连接和通道,在这里使用:
import { Writer } from "./Writer";
export class ConnectionFactory {
url: string;
amqp: any;
constructor(url: string) {
this.url = url;
this.amqp = require('amqplib');
}
async connect(timeout: number = 2000) {
let connection = await this.amqp.connect(this.url, {
// timeout for a message acknowledge.
timeout, // Make the timeout 2s per now
})
return connection;
}
async createChannels(connection: any, writer: Writer) {
//... relevant code starts here
let stageChannels = [];
stageChannels.push(await connection.createChannel())
writer.stageConsume = stageChannels;
return writer;
}
}
}
我的连接工厂似乎工作正常,因为我可以从 CloudAMQP 的仪表板看到连接。我也可以从仪表板看到已创建(在我的代码中断言)的队列。但是,我无法让 amqplib 向 CloudAMQP 发送消息。
这是我用来调用我的包的(异步)代码:
let Cf = new ConnectionFactory(url)
let connection = await Cf.connect()
let writer = new Writer();
let message: TMQMessage = {
//... message content in JSON
}
writer = await Cf.createChannels(connection, writer)
let po = new PostOffice(connection)
await po.send(writer, message)
好像哪里不对?
可能也想等待关闭方法。但总的来说,我会推荐 amqp-client.js(也有 TypeScript 定义),https://github.com/cloudamqp/amqp-client.js/
我正在围绕 amqplib
的 Promise API 构建一个 Typescript 包,使我可以更简单地在两个 RabbitMQ 队列之间发送消息。
这是一个 class 包内的方法,负责向 RabbitMQ 发送消息。它实际上并没有发送消息,因为我无法从 CloudAMQP 看到它们。
class PostOffice {
connection: any;
constructor(connection: any) {
this.connection = connection;
}
//...
// TMQMessage is an object that can be trivially serialized to JSON
// The Writer class holds the channel because I'm having trouble
// making Typescript class member updates stick after a method returns.
async send(writer: Writer, message: TMQMessage) {
//...RabbitMQ code begins here
let channel = writer.stageConsume[0]; // This channel is created elsewhere
// Queue is created here and I can see it in CloudAMQP.
await channel.assertQueue(message.to + "/stage/1", {
durable: true,
}); // `message.to` is a string indicating destination, e.g. 'test/hello'
// According to docs, the content should be converted to Buffer
await channel.sendToQueue(message.to + "/stage/1", Buffer.from(JSON.stringify(message)));
channel.close()
}
}
这是我制作的 class 创建的连接和通道,在这里使用:
import { Writer } from "./Writer";
export class ConnectionFactory {
url: string;
amqp: any;
constructor(url: string) {
this.url = url;
this.amqp = require('amqplib');
}
async connect(timeout: number = 2000) {
let connection = await this.amqp.connect(this.url, {
// timeout for a message acknowledge.
timeout, // Make the timeout 2s per now
})
return connection;
}
async createChannels(connection: any, writer: Writer) {
//... relevant code starts here
let stageChannels = [];
stageChannels.push(await connection.createChannel())
writer.stageConsume = stageChannels;
return writer;
}
}
}
我的连接工厂似乎工作正常,因为我可以从 CloudAMQP 的仪表板看到连接。我也可以从仪表板看到已创建(在我的代码中断言)的队列。但是,我无法让 amqplib 向 CloudAMQP 发送消息。
这是我用来调用我的包的(异步)代码:
let Cf = new ConnectionFactory(url)
let connection = await Cf.connect()
let writer = new Writer();
let message: TMQMessage = {
//... message content in JSON
}
writer = await Cf.createChannels(connection, writer)
let po = new PostOffice(connection)
await po.send(writer, message)
好像哪里不对?
可能也想等待关闭方法。但总的来说,我会推荐 amqp-client.js(也有 TypeScript 定义),https://github.com/cloudamqp/amqp-client.js/