检查 Kafka 流线程的健康状况
Checking health of Kafka Stream Threads
当某些流线程死亡时(例如由于异常),我不想继续,而是重新启动进程。
为了做到这一点,我需要识别这个状态。
我知道我可以使用 kafkaStream.state()
,但它会检查整个 kstreams 的状态。意思是如果只有一个 StreamThread 死了,它不会被 kafkaStream.state()
.
发现
让我在代码中知道所有 StreamThread 都处于活动状态和工作状态的最佳方式是什么?
Update :向 KafkaStreams#close()
添加超时,因为它可能导致死锁,正如 Matthias 所述 in the comment
如果你想检测是否有任何 StreamThreads 已经死亡,那么你可以使用 KafkaStreams#setUncaughtExceptionHandler()
,你可以停止流式传输并退出应用程序:
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
logger.error("Exiting ", e);
kafkaStreams.close(10);
System.exit(1);//exit with error code so container can restart this app
});
当某些流线程死亡时(例如由于异常),我不想继续,而是重新启动进程。
为了做到这一点,我需要识别这个状态。
我知道我可以使用 kafkaStream.state()
,但它会检查整个 kstreams 的状态。意思是如果只有一个 StreamThread 死了,它不会被 kafkaStream.state()
.
让我在代码中知道所有 StreamThread 都处于活动状态和工作状态的最佳方式是什么?
Update :向 KafkaStreams#close()
添加超时,因为它可能导致死锁,正如 Matthias 所述 in the comment
如果你想检测是否有任何 StreamThreads 已经死亡,那么你可以使用 KafkaStreams#setUncaughtExceptionHandler()
,你可以停止流式传输并退出应用程序:
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
logger.error("Exiting ", e);
kafkaStreams.close(10);
System.exit(1);//exit with error code so container can restart this app
});