为什么 Promise.race 在 kafkajs eachMessage 回调中没有解析

Why does Promise.race not resolve in kafkajs eachMessage callback

我定义了这样的承诺...

    const result = await Promise.race([
      new Promise(resolve => {
        consumer.run({
          eachMessage: ({ message }) => {
            const data = JSON.parse(message.value.toString());
            if (data.payload.template
              && data.payload.template.id === '...'
              && data.payload.to[0].email === email) {
              console.log('Should resolve!')
              resolve(data.payload.template.variables.link);
              console.log('resolved');

              consumer.pause();
              consumer.disconnect();
            }
          },
        });
      }),
      new Promise((_, reject) => setTimeout(reject, 3000))
    ]);
    console.log('result is ', result);
    return result;

我可以解决,但它最后没有打印结果,似乎超时和实际承诺都没有按预期工作?这是为什么?我怀疑它与在 kafka js 回调中使用 resolve 有关?


更新:似乎 Promise.race() 没有解决,但为什么呢?

我怀疑你的“成功方”承诺无意中抛出,而你正在默默地吞下错误。

使用 consumer 的模型最小实现(成功或失败 50/50),以下代码有效。

运行 代码示例几次以查看两种情况。

var consumer = {
  interval: null,
  counter: 0,
  run: function (config) {
    this.interval = setInterval(() => {
      this.counter++;
      console.log(`Consumer: message #${this.counter}`);
      config.eachMessage({message: this.counter});
    }, 250);
  },
  pause: function () {
    console.log('Consumer: paused');
    clearInterval(this.interval);
  },
  disconnect: function () {
    console.log('Consumer: disconnected');    
  }
};

Promise.race([
  new Promise(resolve => {
    const expectedMsg = Math.random() < 0.5 ? 3 : 4;
    consumer.run({
      eachMessage: ({ message }) => {
        if (message === expectedMsg) resolve("success");
      }
    });
  }),
  new Promise((_, reject) => setTimeout(() => {
    reject('timeout');
    consumer.pause();
    consumer.disconnect();
  }, 1000))
]).then((result) => {
  console.log(`Result: ${result}`);
}).catch((err) => {
  console.log(`ERROR: ${err}`);
});

我也已将 consumer.pause()consumer.disconnect() 移动到“超时端”承诺,这样可以保证消费者断开连接,尽管它可能 运行 稍微长一点比成功案例所需的更多。