为什么 Promise.race 在 kafkajs eachMessage 回调中没有解析
Why does Promise.race not resolve in kafkajs eachMessage callback
我定义了这样的承诺...
const result = await Promise.race([
new Promise(resolve => {
consumer.run({
eachMessage: ({ message }) => {
const data = JSON.parse(message.value.toString());
if (data.payload.template
&& data.payload.template.id === '...'
&& data.payload.to[0].email === email) {
console.log('Should resolve!')
resolve(data.payload.template.variables.link);
console.log('resolved');
consumer.pause();
consumer.disconnect();
}
},
});
}),
new Promise((_, reject) => setTimeout(reject, 3000))
]);
console.log('result is ', result);
return result;
我可以解决,但它最后没有打印结果,似乎超时和实际承诺都没有按预期工作?这是为什么?我怀疑它与在 kafka js 回调中使用 resolve 有关?
更新:似乎 Promise.race()
没有解决,但为什么呢?
我怀疑你的“成功方”承诺无意中抛出,而你正在默默地吞下错误。
使用 consumer
的模型最小实现(成功或失败 50/50),以下代码有效。
运行 代码示例几次以查看两种情况。
var consumer = {
interval: null,
counter: 0,
run: function (config) {
this.interval = setInterval(() => {
this.counter++;
console.log(`Consumer: message #${this.counter}`);
config.eachMessage({message: this.counter});
}, 250);
},
pause: function () {
console.log('Consumer: paused');
clearInterval(this.interval);
},
disconnect: function () {
console.log('Consumer: disconnected');
}
};
Promise.race([
new Promise(resolve => {
const expectedMsg = Math.random() < 0.5 ? 3 : 4;
consumer.run({
eachMessage: ({ message }) => {
if (message === expectedMsg) resolve("success");
}
});
}),
new Promise((_, reject) => setTimeout(() => {
reject('timeout');
consumer.pause();
consumer.disconnect();
}, 1000))
]).then((result) => {
console.log(`Result: ${result}`);
}).catch((err) => {
console.log(`ERROR: ${err}`);
});
我也已将 consumer.pause()
和 consumer.disconnect()
移动到“超时端”承诺,这样可以保证消费者断开连接,尽管它可能 运行 稍微长一点比成功案例所需的更多。
我定义了这样的承诺...
const result = await Promise.race([
new Promise(resolve => {
consumer.run({
eachMessage: ({ message }) => {
const data = JSON.parse(message.value.toString());
if (data.payload.template
&& data.payload.template.id === '...'
&& data.payload.to[0].email === email) {
console.log('Should resolve!')
resolve(data.payload.template.variables.link);
console.log('resolved');
consumer.pause();
consumer.disconnect();
}
},
});
}),
new Promise((_, reject) => setTimeout(reject, 3000))
]);
console.log('result is ', result);
return result;
我可以解决,但它最后没有打印结果,似乎超时和实际承诺都没有按预期工作?这是为什么?我怀疑它与在 kafka js 回调中使用 resolve 有关?
更新:似乎 Promise.race()
没有解决,但为什么呢?
我怀疑你的“成功方”承诺无意中抛出,而你正在默默地吞下错误。
使用 consumer
的模型最小实现(成功或失败 50/50),以下代码有效。
运行 代码示例几次以查看两种情况。
var consumer = {
interval: null,
counter: 0,
run: function (config) {
this.interval = setInterval(() => {
this.counter++;
console.log(`Consumer: message #${this.counter}`);
config.eachMessage({message: this.counter});
}, 250);
},
pause: function () {
console.log('Consumer: paused');
clearInterval(this.interval);
},
disconnect: function () {
console.log('Consumer: disconnected');
}
};
Promise.race([
new Promise(resolve => {
const expectedMsg = Math.random() < 0.5 ? 3 : 4;
consumer.run({
eachMessage: ({ message }) => {
if (message === expectedMsg) resolve("success");
}
});
}),
new Promise((_, reject) => setTimeout(() => {
reject('timeout');
consumer.pause();
consumer.disconnect();
}, 1000))
]).then((result) => {
console.log(`Result: ${result}`);
}).catch((err) => {
console.log(`ERROR: ${err}`);
});
我也已将 consumer.pause()
和 consumer.disconnect()
移动到“超时端”承诺,这样可以保证消费者断开连接,尽管它可能 运行 稍微长一点比成功案例所需的更多。