等待在 nodejs 中正确连接到 rabbit
Await for proper connection to rabbit in nodejs
我尝试为 amqplib/callback_api 编写简单的 eventemitter 包装器。当兔子不可用或断开连接时,我无法处理情况。
我有方法 getConnect
which returns Promise,它在连接建立时解析。但是如果拒绝连接,Promise 显然会拒绝。如何在连接未建立时强制此方法重新连接
/**
* Async method getConnect for connection
* @returns {Promise<*>}
*/
getConnect = async () => {
return new Promise((resolve, reject) => {
amqp.connect(this.config.url, async function(err, conn) {
if (err) {
reject(err);
}
resolve(conn);
})
})
};
完整代码在这里https://github.com/kimonniez/rabbitEE
也许,我已经很困了,但我完全糊涂了:) 在此先感谢!
如果您只想一直尝试连接直到建立连接,您可以将 getConnect
方法包装到新的 keepConnect
方法中:
keepConnect = async () => {
while (true) {
try {
let conn = await getConnect()
return conn
} catch (e) {}
}
}
但我认为通过更改 while
条件来实现 "try to connect for n times" 之类的东西会更好。通常,"while true" 解决方案不干净并且可能执行不当,并存在减慢事件循环的风险(想象一下如果 connect 方法总是 return 在几毫秒内出错)。
您还可以使用 keepConnect
包装器作为想法,在连接尝试之间实现渐进延迟系统。
如果你想在连接丢失时重新连接,那么这与 Rabbit(我不知道)和他的事件有关。
包裹你的Promise
inside an Observable
Promise
不是为处理 “重试” 逻辑而构建的。如果你想这样做,你应该使用 rxjs
library 查看 Observables。这将允许您在捕获错误时使用任意时间间隔重试。
const { from, interval, of } = rxjs;
const { catchError, mergeMap, tap, skipWhile, take } = rxjs.operators;
const THRESHOLD = 3;
const RETRY_INTERVAL = 1000;
// Equivalent to 'amqp.connect'
const functionThatThrows = number =>
number < THRESHOLD
? Promise.reject(new Error("ERROR"))
: Promise.resolve("OK");
// Equivalent to `getConnect`
const getConnect = () =>
interval(RETRY_INTERVAL)
.pipe(
mergeMap(x => from(functionThatThrows(x)).pipe(catchError(e => of(e)))),
skipWhile(x => {
const isError = x instanceof Error;
if (isError) console.log('Found error. Retrying...');
return isError;
}),
take(1)
).toPromise();
// Resolve only if the inner Promise is resolved
getConnect().then(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.4.0/rxjs.umd.min.js"></script>
说明
- 创建
interval
为 1000
的来源。这意味着它将每秒重试
- 调用你的
amqp.connect
相当于我示例中的 functionThatThrows
- 使用
catchError
运算符捕获错误并 return 它
- 当 returned 对象出错时跳过。只有当您的
Promise
已被解决且未被拒绝时,这将允许您 解决
- 使用
take(1)
获取第一个解析结果
- 使用
toPromise
效用函数 将您的 observable 转换为 promise
- 像使用标准
Promise
一样调用您的函数并附加 then
我尝试为 amqplib/callback_api 编写简单的 eventemitter 包装器。当兔子不可用或断开连接时,我无法处理情况。
我有方法 getConnect
which returns Promise,它在连接建立时解析。但是如果拒绝连接,Promise 显然会拒绝。如何在连接未建立时强制此方法重新连接
/**
* Async method getConnect for connection
* @returns {Promise<*>}
*/
getConnect = async () => {
return new Promise((resolve, reject) => {
amqp.connect(this.config.url, async function(err, conn) {
if (err) {
reject(err);
}
resolve(conn);
})
})
};
完整代码在这里https://github.com/kimonniez/rabbitEE
也许,我已经很困了,但我完全糊涂了:) 在此先感谢!
如果您只想一直尝试连接直到建立连接,您可以将 getConnect
方法包装到新的 keepConnect
方法中:
keepConnect = async () => {
while (true) {
try {
let conn = await getConnect()
return conn
} catch (e) {}
}
}
但我认为通过更改 while
条件来实现 "try to connect for n times" 之类的东西会更好。通常,"while true" 解决方案不干净并且可能执行不当,并存在减慢事件循环的风险(想象一下如果 connect 方法总是 return 在几毫秒内出错)。
您还可以使用 keepConnect
包装器作为想法,在连接尝试之间实现渐进延迟系统。
如果你想在连接丢失时重新连接,那么这与 Rabbit(我不知道)和他的事件有关。
包裹你的Promise
inside an Observable
Promise
不是为处理 “重试” 逻辑而构建的。如果你想这样做,你应该使用 rxjs
library 查看 Observables。这将允许您在捕获错误时使用任意时间间隔重试。
const { from, interval, of } = rxjs;
const { catchError, mergeMap, tap, skipWhile, take } = rxjs.operators;
const THRESHOLD = 3;
const RETRY_INTERVAL = 1000;
// Equivalent to 'amqp.connect'
const functionThatThrows = number =>
number < THRESHOLD
? Promise.reject(new Error("ERROR"))
: Promise.resolve("OK");
// Equivalent to `getConnect`
const getConnect = () =>
interval(RETRY_INTERVAL)
.pipe(
mergeMap(x => from(functionThatThrows(x)).pipe(catchError(e => of(e)))),
skipWhile(x => {
const isError = x instanceof Error;
if (isError) console.log('Found error. Retrying...');
return isError;
}),
take(1)
).toPromise();
// Resolve only if the inner Promise is resolved
getConnect().then(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.4.0/rxjs.umd.min.js"></script>
说明
- 创建
interval
为1000
的来源。这意味着它将每秒重试 - 调用你的
amqp.connect
相当于我示例中的functionThatThrows
- 使用
catchError
运算符捕获错误并 return 它 - 当 returned 对象出错时跳过。只有当您的
Promise
已被解决且未被拒绝时,这将允许您 解决 - 使用
take(1)
获取第一个解析结果
- 使用
toPromise
效用函数 将您的 observable 转换为 promise
- 像使用标准
Promise
一样调用您的函数并附加
then