Kafka Streams Shutdown Hook 和同一 Stream 应用中的意外异常处理

Kafka Streams Shutdown Hook and Unexpected Exception Handling in the same Stream application

我的任务是拆除开发环境并从废料中重新设置它以验证我们的 CI-CD 流程;唯一的问题是我搞砸了创建一个主题,因此 Kafka Streams 应用程序退出并出错。

我深入研究并发现了问题并纠正了它,但是当我深入研究时,我 运行 遇到了另一个奇怪的小问题。

我这样实现了一个意外异常处理程序:

streams.setUncaughtExceptionHandler((t, e) -> {
    logger.fatal("Caught unhandled Kafka Streams Exception:", e);
    // Do some exception handling.
    streams.close();

    // Maybe do some more exception handling.
    // Open a lock that is waiting after streams.start() call 
    // to let application exit normally
    shutdownLatch.countDown();
});

问题是,如果应用程序在调用 KafkaStreams::close 时由于主题错误而抛出异常,则应用程序在尝试调用 KafkaStreams::waitOnState 后似乎死锁在 WindowsSelectorImpl::poll 中.

我认为在异常处理程序中调用 KafkaStreams::close 可能是个问题,但我发现这个 and a comment from Matthias J. Sax 说在异常处理程序中调用 KafkaStreams::Close 应该没问题注意不要从多个线程调用 KafkaStreams::close。

问题是我想实现一个关闭挂钩以根据请求优雅地终止 Steam 应用程序,以及实现 UnexpectedException 处理程序以在出现异常时优雅地清理和终止。

我想出了以下解决方案,它在调用关闭之前检查 KafkaStreams 状态,它确实有效,但它似乎有点不确定,因为我可以看到除了 运行(可能是待定)之外的其他情况,我们在那里想要确保它调用的KafkaStreams::close。

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    logger.fatal("Caught Shutdown request");
    // Do some shutdown cleanup.
    if (streams.state().isRunning())
    {
        If this hook is called due to the Main exiting after handling 
        an exception we don't want to call close again. It doesn't 
        cause any errors but logs that the application was closed 
        a second time.
        streams.close(100L, TimeUnit.MILLISECONDS);
    }
    // Maybe do a little bit more clean up before system exits.
    System.exit(0);

}));

streams.setUncaughtExceptionHandler((t, e) -> {
    logger.fatal("Caught unhandled Kafka Streams Exception:", e);
    // Do some exception handling.
    if (streams.state().isRunning())
    {
        streams.close(100L, TimeUnit.MILLISECONDS);
    }
    // Maybe do some more exception handling.

    // Open the Gate to let application exit normally
    shutdownLatch.countDown();
    // Or Optionally call halt to immediately terminate and prevent call to Shutdown hook.
    Runtime.getRuntime().halt(0);
});

任何关于为什么在异常处理程序中调用 KafkaSteams:close 会导致这样的麻烦的建议,或者如果有更好的方法同时实现关闭挂钩和异常处理程序,将不胜感激?

从异常处理程序和关闭挂钩调用 close() 略有不同。 close() 如果从关闭挂钩调用(参见 https://issues.apache.org/jira/browse/KAFKA-4366)可能会死锁,因此,您应该超时调用它。

此外,该问题与从未捕获的异常处理程序中调用 System.exit() 有关,如 Jira 中所述。一般来说,调用 System.exit() 是相当苛刻的,应该避免恕我直言。

您的解决方案似乎也不是 100% 可靠的,因为 streams.state().isRunning() 可能会导致竞争条件。

使用超时的替代方法可能是仅在关闭挂钩和异常处理程序中设置 AtomicBoolean,并在布尔标志设置为时使用 "main()" 线程调用关闭真:

private final static AtomicBoolean stopStreams = new AtomicBoolean(false);

public static void main(String[] args) {
  // do stuff

  KafkaStreams streams = ...
  stream.setUncaughtExceptionHandler((t, e) -> {
    stopStreams.set(true);
  });

  Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    stopStreams.set(true);
  });

  while (!stopStreams.get()) {
    Thread.sleep(1000);
  }
  streams.close();
}