检查 Kafka 流线程的健康状况

Checking health of Kafka Stream Threads

当某些流线程死亡时(例如由于异常),我不想继续,而是重新启动进程。

为了做到这一点,我需要识别这个状态。

我知道我可以使用 kafkaStream.state(),但它会检查整个 kstreams 的状态。意思是如果只有一个 StreamThread 死了,它不会被 kafkaStream.state().

发现

让我在代码中知道所有 StreamThread 都处于活动状态和工作状态的最佳方式是什么?

Update :向 KafkaStreams#close() 添加超时,因为它可能导致死锁,正如 Matthias 所述 in the comment

如果你想检测是否有任何 StreamThreads 已经死亡,那么你可以使用 KafkaStreams#setUncaughtExceptionHandler(),你可以停止流式传输并退出应用程序:

kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
    logger.error("Exiting ", e);
    kafkaStreams.close(10);
    System.exit(1);//exit with error code so container can restart this app
});