kafkajs - 断开连接后优雅地停止 kafkajs 实例
kafkajs - gracefully stop kafkajs instance after disconnect
我在生产和集成测试中都使用了 kafkajs。
在我所有的测试之前,我正在创建一个包含生产者和消费者的 kafkajs 实例 connect/subscribe/run(eachMeassage)...
在我所有的测试之后,我想优雅地停止我所有的节点进程,包括 kafkajs 组件。
我实际上是这样做的:
export function stopHelper(): Promise<void> {
return new Promise<void>((resolve, reject) => {
if (kafkaHelperState === kafkaHelperStateStatus.running) {
kafkaHelperState = kafkaHelperStateStatus.stopping
log.debug("stopHelper", kafkaHelperState);
Promise.all([producer.disconnect, consumer.disconnect])
.then(() => {
kafkaHelperState = kafkaHelperStateStatus.stopped
log.info("stopHelper", kafkaHelperState);
resolve()
})
.catch(error => reject(error))
} else {
log.warn("stopHelper", "kafkaHelper is not " + kafkaHelperStateStatus.running)
}
})
}
Promises 似乎有效。
我可以看到我的集成测试套件已完成,生产者和消费者都已断开连接。
但是我的node进程还是运行什么都没做。
之前我用的是kafka-node。当我停止消费者时,我的节点进程结束而无需指定任何 process.exit(0)
有没有办法优雅地销毁node进程中的kafkajs实例?
Promise.all([producer.disconnect(), consumer.disconnect()])
而不是
Promise.all([producer.disconnect, consumer.disconnect])
我在生产和集成测试中都使用了 kafkajs。
在我所有的测试之前,我正在创建一个包含生产者和消费者的 kafkajs 实例 connect/subscribe/run(eachMeassage)... 在我所有的测试之后,我想优雅地停止我所有的节点进程,包括 kafkajs 组件。
我实际上是这样做的:
export function stopHelper(): Promise<void> {
return new Promise<void>((resolve, reject) => {
if (kafkaHelperState === kafkaHelperStateStatus.running) {
kafkaHelperState = kafkaHelperStateStatus.stopping
log.debug("stopHelper", kafkaHelperState);
Promise.all([producer.disconnect, consumer.disconnect])
.then(() => {
kafkaHelperState = kafkaHelperStateStatus.stopped
log.info("stopHelper", kafkaHelperState);
resolve()
})
.catch(error => reject(error))
} else {
log.warn("stopHelper", "kafkaHelper is not " + kafkaHelperStateStatus.running)
}
})
}
Promises 似乎有效。 我可以看到我的集成测试套件已完成,生产者和消费者都已断开连接。 但是我的node进程还是运行什么都没做。
之前我用的是kafka-node。当我停止消费者时,我的节点进程结束而无需指定任何 process.exit(0)
有没有办法优雅地销毁node进程中的kafkajs实例?
Promise.all([producer.disconnect(), consumer.disconnect()])
而不是
Promise.all([producer.disconnect, consumer.disconnect])