AMQP + NodeJS 等待通道
AMQP + NodeJS wait for channel
我在 FeathersJS 中有一项服务启动到 RabbitMQ 的连接,问题是如何在接收请求之前等待通道准备就绪:
class Service {
constructor({ amqpConnection, queueName }) {
this.amqpConnection = amqpConnection;
this.queueName = queueName;
this.replyQueueName = queueName + "Reply"
}
async create(data, params) {
new Promise(resolve => {
if (!this.channel) await this.createChannel();
channel.responseEmitter.once(correlationId, resolve);
channel.sendToQueue(this.queueName, Buffer.from(data), {
correlationId: asyncLocalStorage.getStore(),
replyTo: this.replyQueueName,
});
});
}
async createChannel() {
let connection = this.amqpConnection();
let channel = await connection.createChannel();
await channel.assertQueue(this.queueName, {
durable: false,
});
this.channel = channel;
channel.responseEmitter = new EventEmitter();
channel.responseEmitter.setMaxListeners(0);
channel.consume(
this.replyQueueName,
(msg) => {
channel.responseEmitter.emit(
msg.properties.correlationId,
msg.content.toString("utf8")
);
},
{ noAck: true }
);
}
....
}
在请求期间等待创建通道似乎是一种浪费。这应该如何“正确”完成?
Feathers 服务可以实现一个 setup method 将在服务器启动时调用(或者您自己调用 app.setup()
):
class Service {
async setup () {
await this.createChannel();
}
}
我在 FeathersJS 中有一项服务启动到 RabbitMQ 的连接,问题是如何在接收请求之前等待通道准备就绪:
class Service {
constructor({ amqpConnection, queueName }) {
this.amqpConnection = amqpConnection;
this.queueName = queueName;
this.replyQueueName = queueName + "Reply"
}
async create(data, params) {
new Promise(resolve => {
if (!this.channel) await this.createChannel();
channel.responseEmitter.once(correlationId, resolve);
channel.sendToQueue(this.queueName, Buffer.from(data), {
correlationId: asyncLocalStorage.getStore(),
replyTo: this.replyQueueName,
});
});
}
async createChannel() {
let connection = this.amqpConnection();
let channel = await connection.createChannel();
await channel.assertQueue(this.queueName, {
durable: false,
});
this.channel = channel;
channel.responseEmitter = new EventEmitter();
channel.responseEmitter.setMaxListeners(0);
channel.consume(
this.replyQueueName,
(msg) => {
channel.responseEmitter.emit(
msg.properties.correlationId,
msg.content.toString("utf8")
);
},
{ noAck: true }
);
}
....
}
在请求期间等待创建通道似乎是一种浪费。这应该如何“正确”完成?
Feathers 服务可以实现一个 setup method 将在服务器启动时调用(或者您自己调用 app.setup()
):
class Service {
async setup () {
await this.createChannel();
}
}