等待在 nodejs 中正确连接到 rabbit

Await for proper connection to rabbit in nodejs

我尝试为 amqplib/callback_api 编写简单的 eventemitter 包装器。当兔子不可用或断开连接时,我无法处理情况。 我有方法 getConnect which returns Promise,它在连接建立时解析。但是如果拒绝连接,Promise 显然会拒绝。如何在连接未建立时强制此方法重新连接

/**
     * Async method getConnect for connection
     * @returns {Promise<*>}
     */
    getConnect = async () => {
        return new Promise((resolve, reject) => {
            amqp.connect(this.config.url, async function(err, conn) {
                    if (err) {
                        reject(err);
                    }
                    resolve(conn);
            })
        })
    };

完整代码在这里https://github.com/kimonniez/rabbitEE

也许,我已经很困了,但我完全糊涂了:) 在此先感谢!

如果您只想一直尝试连接直到建立连接,您可以将 getConnect 方法包装到新的 keepConnect 方法中:

keepConnect = async () => {
   while (true) {
      try {
         let conn = await getConnect()
         return conn
      } catch (e) {}
  }
}

但我认为通过更改 while 条件来实现 "try to connect for n times" 之类的东西会更好。通常,"while true" 解决方案不干净并且可能执行不当,并存在减慢事件循环的风险(想象一下如果 connect 方法总是 return 在几毫秒内出错)。

您还可以使用 keepConnect 包装器作为想法,在连接尝试之间实现渐进延迟系统。

如果你想在连接丢失时重新连接,那么这与 Rabbit(我不知道)和他的事件有关。

包裹你的Promise inside an Observable

Promise 不是为处理 “重试” 逻辑而构建的。如果你想这样做,你应该使用 rxjs library 查看 Observables。这将允许您在捕获错误时使用任意时间间隔重试。

const { from, interval, of } = rxjs;
const { catchError, mergeMap, tap, skipWhile, take } = rxjs.operators;

const THRESHOLD = 3;
const RETRY_INTERVAL = 1000;

// Equivalent to 'amqp.connect'
const functionThatThrows = number =>
  number < THRESHOLD
    ? Promise.reject(new Error("ERROR"))
    : Promise.resolve("OK");

// Equivalent to `getConnect`
const getConnect = () =>
  interval(RETRY_INTERVAL)
    .pipe(
      mergeMap(x => from(functionThatThrows(x)).pipe(catchError(e => of(e)))),
      skipWhile(x => {
        const isError = x instanceof Error;
        if (isError) console.log('Found error. Retrying...');
        return isError;
      }),
      take(1)
    ).toPromise();

// Resolve only if the inner Promise is resolved
getConnect().then(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.4.0/rxjs.umd.min.js"></script>

说明

  1. 创建 interval1000 的来源。这意味着它将每秒重试
  2. 调用你的 amqp.connect 相当于我示例中的 functionThatThrows
  3. 使用 catchError 运算符捕获错误并 return 它
  4. 当 returned 对象出错时跳过。只有当您的 Promise 已被解决且未被拒绝时,这将允许您 解决
  5. 使用take(1)
  6. 获取第一个解析结果
  7. 使用 toPromise 效用函数
  8. 将您的 observable 转换为 promise
  9. 像使用标准 Promise
  10. 一样调用您的函数并附加 then