amqplib:套接字在打开握手期间突然关闭
amqplib: Socket closed abruptly during opening handshake
我想做什么
我尝试创建 rabbit-mq 发布者和订阅者。它按预期工作,直到我尝试重新启动我的 rabbit-mq 服务器。
什么有效
我使用 rabbitmq:3-management
docker 图像、ampqlib 5.3
和 Node.js 11.10.0
来制作这个简单的程序:
const q = 'tasks';
const { execSync } = require("child_process");
const amqplib = require("amqplib");
function createChannel() {
return amqplib.connect("amqp://root:toor@0.0.0.0:5672/")
.then((conn) => conn.createChannel());
}
Promise.all([createChannel(), createChannel()])
.then(async (channels) => {
const [publisherChannel, consumerChannel] = channels;
// publisher
await publisherChannel.assertQueue(q).then(function(ok) {
return publisherChannel.sendToQueue(q, Buffer.from("something to do"));
});
// consumer
await consumerChannel.assertQueue(q).then(function(ok) {
return consumerChannel.consume(q, function(msg) {
if (msg !== null) {
console.log(msg.content.toString());
consumerChannel.ack(msg);
}
});
});
})
.catch(console.warn);
所以,首先,我制作了两个频道。一个作为发布者,一个作为消费者。
发布者向 tasks
队列发送 something to do
消息。
消费者然后捕获消息并使用 console.log
将其打印到屏幕上。
它按预期工作。
什么不起作用
第一次尝试
const q = 'tasks';
const { execSync } = require("child_process");
const amqplib = require("amqplib");
function createChannel() {
return amqplib.connect("amqp://root:toor@0.0.0.0:5672/")
.then((conn) => conn.createChannel());
}
Promise.all([createChannel(), createChannel()])
.then((channels) => {
// Let's say rabbitmq is down, and then up again
execSync("docker stop rabbitmq");
execSync("docker start rabbitmq");
return channels;
})
.then(async (channels) => {
const [publisherChannel, consumerChannel] = channels;
// publisher
await publisherChannel.assertQueue(q).then(function(ok) {
return publisherChannel.sendToQueue(q, Buffer.from("something to do"));
});
// consumer
await consumerChannel.assertQueue(q).then(function(ok) {
return consumerChannel.consume(q, function(msg) {
if (msg !== null) {
console.log(msg.content.toString());
consumerChannel.ack(msg);
}
});
});
})
.catch(console.warn);
与我之前的尝试类似,但这次我尝试在继续之前停止并启动 rabbit-mq 容器(重新启动服务器)。
它不起作用,我收到此错误:
{ Error: Socket closed abruptly during opening handshake
at Socket.endWhileOpening (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:260:17)
at Socket.emit (events.js:202:15)
at endReadableNT (_stream_readable.js:1129:12)
at processTicksAndRejections (internal/process/next_tick.js:76:17)
cause:
Error: Socket closed abruptly during opening handshake
at Socket.endWhileOpening (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:260:17)
at Socket.emit (events.js:202:15)
at endReadableNT (_stream_readable.js:1129:12)
at processTicksAndRejections (internal/process/next_tick.js:76:17),
isOperational: true }
[guldan@draenor labs]$ node --version
v11.10.0
[guldan@draenor labs]$ docker start rabbitmq && node test.js
rabbitmq
{ Error: Channel ended, no reply will be forthcoming
at rej (/home/guldan/Projects/node_modules/amqplib/lib/channel.js:195:7)
at Channel.C._rejectPending (/home/guldan/Projects/node_modules/amqplib/lib/channel.js:197:28)
at Channel.C.toClosed (/home/guldan/Projects/node_modules/amqplib/lib/channel.js:165:8)
at Connection.C._closeChannels (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:394:18)
at Connection.C.toClosed (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:401:8)
at Object.accept (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:96:18)
at Connection.mainAccept [as accept] (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:64:33)
at Socket.go (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:478:48)
at Socket.emit (events.js:197:13)
at emitReadable_ (_stream_readable.js:539:12)
at processTicksAndRejections (internal/process/next_tick.js:76:17)
cause:
Error: Channel ended, no reply will be forthcoming
at rej (/home/guldan/Projects/node_modules/amqplib/lib/channel.js:195:7)
at Channel.C._rejectPending (/home/guldan/Projects/node_modules/amqplib/lib/channel.js:197:28)
at Channel.C.toClosed (/home/guldan/Projects/node_modules/amqplib/lib/channel.js:165:8)
at Connection.C._closeChannels (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:394:18)
at Connection.C.toClosed (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:401:8)
at Object.accept (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:96:18)
at Connection.mainAccept [as accept] (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:64:33)
at Socket.go (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:478:48)
at Socket.emit (events.js:197:13)
at emitReadable_ (_stream_readable.js:539:12)
at processTicksAndRejections (internal/process/next_tick.js:76:17),
isOperational: true }
第二次尝试
我的第一次尝试没有成功。所以,我尝试在重启服务器后创建新频道:
const q = 'tasks';
const { execSync } = require("child_process");
const amqplib = require("amqplib");
function createChannel() {
return amqplib.connect("amqp://root:toor@0.0.0.0:5672/")
.then((conn) => conn.createChannel());
}
Promise.all([createChannel(), createChannel()])
.then((channels) => {
// Let's say rabbitmq is down, and then up again
execSync("docker stop rabbitmq");
execSync("docker start rabbitmq");
return Promise.all([createChannel(), createChannel()]);
// return channels;
})
.then(async (channels) => {
const [publisherChannel, consumerChannel] = channels;
// publisher
await publisherChannel.assertQueue(q).then(function(ok) {
return publisherChannel.sendToQueue(q, Buffer.from("something to do"));
});
// consumer
await consumerChannel.assertQueue(q).then(function(ok) {
return consumerChannel.consume(q, function(msg) {
if (msg !== null) {
console.log(msg.content.toString());
consumerChannel.ack(msg);
}
});
});
})
.catch(console.warn);
而这一次,我得到了这个错误:
{ Error: Socket closed abruptly during opening handshake
at Socket.endWhileOpening (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:260:17)
at Socket.emit (events.js:202:15)
at endReadableNT (_stream_readable.js:1129:12)
at processTicksAndRejections (internal/process/next_tick.js:76:17)
cause:
Error: Socket closed abruptly during opening handshake
at Socket.endWhileOpening (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:260:17)
at Socket.emit (events.js:202:15)
at endReadableNT (_stream_readable.js:1129:12)
at processTicksAndRejections (internal/process/next_tick.js:76:17),
isOperational: true }
我不是很确定,但我认为这个错误与它可能与 https://github.com/squaremo/amqp.node/issues/101 有关。
我想要的
我希望workaround/solution在服务器重启后重新连接到rabbitmq。也欢迎任何 explanation/suggestion。
编辑
我尝试更深入地修改我的代码:
const q = 'tasks';
const { execSync } = require("child_process");
const amqplib = require("amqplib");
async function createConnection() {
console.log("connect");
const conn = amqplib.connect("amqp://root:toor@0.0.0.0:5672/");
console.log("connected");
return conn;
}
async function createChannel(conn) {
console.log("create channel");
const channel = conn.createChannel({durable: false});
console.log("channel created");
return channel;
}
async function createConnectionAndChannel() {
const conn = await createConnection();
const channel = await createChannel(conn);
return channel;
}
Promise.all([createConnectionAndChannel(), createConnectionAndChannel()])
.then((channels) => {
// Let's say rabbitmq is down, and then up again
console.log("restart server");
execSync("docker stop rabbitmq");
execSync("docker start rabbitmq");
console.log("server restarted");
return Promise.all([createConnectionAndChannel(), createConnectionAndChannel()]);
// return channels;
})
.then(async (channels) => {
console.log("channels created");
const [publisherChannel, consumerChannel] = channels;
// publisher
console.log("publish");
await publisherChannel.assertQueue(q).then(function(ok) {
console.log("published");
return publisherChannel.sendToQueue(q, Buffer.from("something to do"));
});
// consumer
console.log("consume");
await consumerChannel.assertQueue(q).then(function(ok) {
console.log("consumed");
return consumerChannel.consume(q, function(msg) {
if (msg !== null) {
console.log(msg.content.toString());
consumerChannel.ack(msg);
}
});
});
})
.catch(console.warn);
我得到这个输出:
connect
connected
connect
connected
create channel
channel created
create channel
channel created
restart server
server restarted
connect
connected
connect
connected
{ Error: Socket closed abruptly during opening handshake
at Socket.endWhileOpening (/home/guldan/Projects/kata/merapi-plugin-service-rabbit/node_modules/amqplib/lib/connection.js:260:17)
at Socket.emit (events.js:202:15)
at endReadableNT (_stream_readable.js:1129:12)
at processTicksAndRejections (internal/process/next_tick.js:76:17)
cause:
Error: Socket closed abruptly during opening handshake
at Socket.endWhileOpening (/home/guldan/Projects/kata/merapi-plugin-service-rabbit/node_modules/amqplib/lib/connection.js:260:17)
at Socket.emit (events.js:202:15)
at endReadableNT (_stream_readable.js:1129:12)
at processTicksAndRejections (internal/process/next_tick.js:76:17),
isOperational: true }
所以我猜 amqplib 能够重新连接但是无法创建通道。
终于找到答案了:
const { execSync } = require("child_process");
const amqp = require("amqplib");
async function sleep(delay) {
return new Promise((resolve, reject) => {
setTimeout(resolve, delay);
});
}
async function createChannel(config) {
const { url, publishers, listeners } = Object.assign({url: "", publishers: {}, listeners: {}}, config);
try {
// create connection
const connection = await amqp.connect(url);
let channel = null;
connection._channels = [];
connection.on("error", (error) => {
console.error("Connection error : ", config, error);
});
connection.on("close", async (error) => {
if (channel) {
channel.close();
}
console.error("Connection close : ", config, error);
await sleep(1000);
createChannel(config);
});
// create channel
channel = await connection.createConfirmChannel();
channel.on("error", (error) => {
console.error("Channel error : ", config, error);
});
channel.on("close", (error) => {
console.error("Channel close : ", config, error);
});
// register listeners
for (queue in listeners) {
const callback = listeners[queue];
channel.assertQueue(queue, { durable: false });
channel.consume(queue, callback);
}
// publish
for (queue in publishers) {
const message = publishers[queue];
channel.assertQueue(queue, { durable: false });
channel.sendToQueue(queue, message);
}
return channel;
} catch (error) {
console.error("Create connection error : ", error);
await sleep(1000);
createChannel(config);
}
}
async function main() {
// publish "hello" message to queue
const channelPublish = await createChannel({
url: "amqp://root:toor@0.0.0.0:5672",
publishers: {
"queue": Buffer.from("hello"),
}
});
// restart rabbitmq
execSync("docker stop rabbitmq");
execSync("docker start rabbitmq");
// consume message from queue
const channelConsume = await createChannel({
url: "amqp://root:toor@0.0.0.0:5672",
listeners: {
"queue": (message) => {
console.log("Receive message ", message.content.toString());
},
}
});
return true;
}
main().catch((error) => console.error(error));
基本上,我将频道附加到连接中。因此,每当连接产生错误(例如:rabbitmq 服务器关闭)时,程序将等待一秒钟,然后尝试创建一个新连接。
缺点是,我会丢失对旧连接及其通道的引用。
为了克服这个麻烦,我将队列、发布者和消费者信息存储在其他地方(在这种情况下我将其作为createChannel
的参数)。
最后,每次重新连接,我也会制作频道以及构建每个发布者和消费者。
不太方便,但至少可以按预期工作。
在这里丢失引用似乎无关紧要,rabbitmq 仅在通道不存在时才创建通道,如果通道存在,新变量引用将指向同一通道。因此,不尝试存储引用可能会使应用程序更轻量级。
我遇到这个问题是因为我传递了 URL "amqp://guest:guest@localhost:15672/" 但现在我传递了 URL "amqp://localhost " 在连接 URL.After 中传递我的问题已得到修复。
我想做什么
我尝试创建 rabbit-mq 发布者和订阅者。它按预期工作,直到我尝试重新启动我的 rabbit-mq 服务器。
什么有效
我使用 rabbitmq:3-management
docker 图像、ampqlib 5.3
和 Node.js 11.10.0
来制作这个简单的程序:
const q = 'tasks';
const { execSync } = require("child_process");
const amqplib = require("amqplib");
function createChannel() {
return amqplib.connect("amqp://root:toor@0.0.0.0:5672/")
.then((conn) => conn.createChannel());
}
Promise.all([createChannel(), createChannel()])
.then(async (channels) => {
const [publisherChannel, consumerChannel] = channels;
// publisher
await publisherChannel.assertQueue(q).then(function(ok) {
return publisherChannel.sendToQueue(q, Buffer.from("something to do"));
});
// consumer
await consumerChannel.assertQueue(q).then(function(ok) {
return consumerChannel.consume(q, function(msg) {
if (msg !== null) {
console.log(msg.content.toString());
consumerChannel.ack(msg);
}
});
});
})
.catch(console.warn);
所以,首先,我制作了两个频道。一个作为发布者,一个作为消费者。
发布者向 tasks
队列发送 something to do
消息。
消费者然后捕获消息并使用 console.log
将其打印到屏幕上。
它按预期工作。
什么不起作用
第一次尝试
const q = 'tasks';
const { execSync } = require("child_process");
const amqplib = require("amqplib");
function createChannel() {
return amqplib.connect("amqp://root:toor@0.0.0.0:5672/")
.then((conn) => conn.createChannel());
}
Promise.all([createChannel(), createChannel()])
.then((channels) => {
// Let's say rabbitmq is down, and then up again
execSync("docker stop rabbitmq");
execSync("docker start rabbitmq");
return channels;
})
.then(async (channels) => {
const [publisherChannel, consumerChannel] = channels;
// publisher
await publisherChannel.assertQueue(q).then(function(ok) {
return publisherChannel.sendToQueue(q, Buffer.from("something to do"));
});
// consumer
await consumerChannel.assertQueue(q).then(function(ok) {
return consumerChannel.consume(q, function(msg) {
if (msg !== null) {
console.log(msg.content.toString());
consumerChannel.ack(msg);
}
});
});
})
.catch(console.warn);
与我之前的尝试类似,但这次我尝试在继续之前停止并启动 rabbit-mq 容器(重新启动服务器)。
它不起作用,我收到此错误:
{ Error: Socket closed abruptly during opening handshake
at Socket.endWhileOpening (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:260:17)
at Socket.emit (events.js:202:15)
at endReadableNT (_stream_readable.js:1129:12)
at processTicksAndRejections (internal/process/next_tick.js:76:17)
cause:
Error: Socket closed abruptly during opening handshake
at Socket.endWhileOpening (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:260:17)
at Socket.emit (events.js:202:15)
at endReadableNT (_stream_readable.js:1129:12)
at processTicksAndRejections (internal/process/next_tick.js:76:17),
isOperational: true }
[guldan@draenor labs]$ node --version
v11.10.0
[guldan@draenor labs]$ docker start rabbitmq && node test.js
rabbitmq
{ Error: Channel ended, no reply will be forthcoming
at rej (/home/guldan/Projects/node_modules/amqplib/lib/channel.js:195:7)
at Channel.C._rejectPending (/home/guldan/Projects/node_modules/amqplib/lib/channel.js:197:28)
at Channel.C.toClosed (/home/guldan/Projects/node_modules/amqplib/lib/channel.js:165:8)
at Connection.C._closeChannels (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:394:18)
at Connection.C.toClosed (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:401:8)
at Object.accept (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:96:18)
at Connection.mainAccept [as accept] (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:64:33)
at Socket.go (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:478:48)
at Socket.emit (events.js:197:13)
at emitReadable_ (_stream_readable.js:539:12)
at processTicksAndRejections (internal/process/next_tick.js:76:17)
cause:
Error: Channel ended, no reply will be forthcoming
at rej (/home/guldan/Projects/node_modules/amqplib/lib/channel.js:195:7)
at Channel.C._rejectPending (/home/guldan/Projects/node_modules/amqplib/lib/channel.js:197:28)
at Channel.C.toClosed (/home/guldan/Projects/node_modules/amqplib/lib/channel.js:165:8)
at Connection.C._closeChannels (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:394:18)
at Connection.C.toClosed (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:401:8)
at Object.accept (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:96:18)
at Connection.mainAccept [as accept] (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:64:33)
at Socket.go (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:478:48)
at Socket.emit (events.js:197:13)
at emitReadable_ (_stream_readable.js:539:12)
at processTicksAndRejections (internal/process/next_tick.js:76:17),
isOperational: true }
第二次尝试
我的第一次尝试没有成功。所以,我尝试在重启服务器后创建新频道:
const q = 'tasks';
const { execSync } = require("child_process");
const amqplib = require("amqplib");
function createChannel() {
return amqplib.connect("amqp://root:toor@0.0.0.0:5672/")
.then((conn) => conn.createChannel());
}
Promise.all([createChannel(), createChannel()])
.then((channels) => {
// Let's say rabbitmq is down, and then up again
execSync("docker stop rabbitmq");
execSync("docker start rabbitmq");
return Promise.all([createChannel(), createChannel()]);
// return channels;
})
.then(async (channels) => {
const [publisherChannel, consumerChannel] = channels;
// publisher
await publisherChannel.assertQueue(q).then(function(ok) {
return publisherChannel.sendToQueue(q, Buffer.from("something to do"));
});
// consumer
await consumerChannel.assertQueue(q).then(function(ok) {
return consumerChannel.consume(q, function(msg) {
if (msg !== null) {
console.log(msg.content.toString());
consumerChannel.ack(msg);
}
});
});
})
.catch(console.warn);
而这一次,我得到了这个错误:
{ Error: Socket closed abruptly during opening handshake
at Socket.endWhileOpening (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:260:17)
at Socket.emit (events.js:202:15)
at endReadableNT (_stream_readable.js:1129:12)
at processTicksAndRejections (internal/process/next_tick.js:76:17)
cause:
Error: Socket closed abruptly during opening handshake
at Socket.endWhileOpening (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:260:17)
at Socket.emit (events.js:202:15)
at endReadableNT (_stream_readable.js:1129:12)
at processTicksAndRejections (internal/process/next_tick.js:76:17),
isOperational: true }
我不是很确定,但我认为这个错误与它可能与 https://github.com/squaremo/amqp.node/issues/101 有关。
我想要的
我希望workaround/solution在服务器重启后重新连接到rabbitmq。也欢迎任何 explanation/suggestion。
编辑
我尝试更深入地修改我的代码:
const q = 'tasks';
const { execSync } = require("child_process");
const amqplib = require("amqplib");
async function createConnection() {
console.log("connect");
const conn = amqplib.connect("amqp://root:toor@0.0.0.0:5672/");
console.log("connected");
return conn;
}
async function createChannel(conn) {
console.log("create channel");
const channel = conn.createChannel({durable: false});
console.log("channel created");
return channel;
}
async function createConnectionAndChannel() {
const conn = await createConnection();
const channel = await createChannel(conn);
return channel;
}
Promise.all([createConnectionAndChannel(), createConnectionAndChannel()])
.then((channels) => {
// Let's say rabbitmq is down, and then up again
console.log("restart server");
execSync("docker stop rabbitmq");
execSync("docker start rabbitmq");
console.log("server restarted");
return Promise.all([createConnectionAndChannel(), createConnectionAndChannel()]);
// return channels;
})
.then(async (channels) => {
console.log("channels created");
const [publisherChannel, consumerChannel] = channels;
// publisher
console.log("publish");
await publisherChannel.assertQueue(q).then(function(ok) {
console.log("published");
return publisherChannel.sendToQueue(q, Buffer.from("something to do"));
});
// consumer
console.log("consume");
await consumerChannel.assertQueue(q).then(function(ok) {
console.log("consumed");
return consumerChannel.consume(q, function(msg) {
if (msg !== null) {
console.log(msg.content.toString());
consumerChannel.ack(msg);
}
});
});
})
.catch(console.warn);
我得到这个输出:
connect
connected
connect
connected
create channel
channel created
create channel
channel created
restart server
server restarted
connect
connected
connect
connected
{ Error: Socket closed abruptly during opening handshake
at Socket.endWhileOpening (/home/guldan/Projects/kata/merapi-plugin-service-rabbit/node_modules/amqplib/lib/connection.js:260:17)
at Socket.emit (events.js:202:15)
at endReadableNT (_stream_readable.js:1129:12)
at processTicksAndRejections (internal/process/next_tick.js:76:17)
cause:
Error: Socket closed abruptly during opening handshake
at Socket.endWhileOpening (/home/guldan/Projects/kata/merapi-plugin-service-rabbit/node_modules/amqplib/lib/connection.js:260:17)
at Socket.emit (events.js:202:15)
at endReadableNT (_stream_readable.js:1129:12)
at processTicksAndRejections (internal/process/next_tick.js:76:17),
isOperational: true }
所以我猜 amqplib 能够重新连接但是无法创建通道。
终于找到答案了:
const { execSync } = require("child_process");
const amqp = require("amqplib");
async function sleep(delay) {
return new Promise((resolve, reject) => {
setTimeout(resolve, delay);
});
}
async function createChannel(config) {
const { url, publishers, listeners } = Object.assign({url: "", publishers: {}, listeners: {}}, config);
try {
// create connection
const connection = await amqp.connect(url);
let channel = null;
connection._channels = [];
connection.on("error", (error) => {
console.error("Connection error : ", config, error);
});
connection.on("close", async (error) => {
if (channel) {
channel.close();
}
console.error("Connection close : ", config, error);
await sleep(1000);
createChannel(config);
});
// create channel
channel = await connection.createConfirmChannel();
channel.on("error", (error) => {
console.error("Channel error : ", config, error);
});
channel.on("close", (error) => {
console.error("Channel close : ", config, error);
});
// register listeners
for (queue in listeners) {
const callback = listeners[queue];
channel.assertQueue(queue, { durable: false });
channel.consume(queue, callback);
}
// publish
for (queue in publishers) {
const message = publishers[queue];
channel.assertQueue(queue, { durable: false });
channel.sendToQueue(queue, message);
}
return channel;
} catch (error) {
console.error("Create connection error : ", error);
await sleep(1000);
createChannel(config);
}
}
async function main() {
// publish "hello" message to queue
const channelPublish = await createChannel({
url: "amqp://root:toor@0.0.0.0:5672",
publishers: {
"queue": Buffer.from("hello"),
}
});
// restart rabbitmq
execSync("docker stop rabbitmq");
execSync("docker start rabbitmq");
// consume message from queue
const channelConsume = await createChannel({
url: "amqp://root:toor@0.0.0.0:5672",
listeners: {
"queue": (message) => {
console.log("Receive message ", message.content.toString());
},
}
});
return true;
}
main().catch((error) => console.error(error));
基本上,我将频道附加到连接中。因此,每当连接产生错误(例如:rabbitmq 服务器关闭)时,程序将等待一秒钟,然后尝试创建一个新连接。
缺点是,我会丢失对旧连接及其通道的引用。
为了克服这个麻烦,我将队列、发布者和消费者信息存储在其他地方(在这种情况下我将其作为createChannel
的参数)。
最后,每次重新连接,我也会制作频道以及构建每个发布者和消费者。
不太方便,但至少可以按预期工作。
在这里丢失引用似乎无关紧要,rabbitmq 仅在通道不存在时才创建通道,如果通道存在,新变量引用将指向同一通道。因此,不尝试存储引用可能会使应用程序更轻量级。
我遇到这个问题是因为我传递了 URL "amqp://guest:guest@localhost:15672/" 但现在我传递了 URL "amqp://localhost " 在连接 URL.After 中传递我的问题已得到修复。