使用 UncaughtExceptionHandler 重新启动或关闭流的正确方法

Correct way to restart or shutdown the stream using UncaughtExceptionHandler

我有一个带有以下驱动程序代码的流应用程序,用于实时消息转换。

String topicName = ...
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream(topicName);

source.transform(() -> new MyTransformer()).to(...);

KafkaStreams streams = new KafkaStreams(builder, appConfig);
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
    public void uncaughtException(Thread t, Throwable e) {
        logger.error("UncaughtExceptionHandler " + e.getMessage());
        System.exit(0);
    }
});


streams.cleanUp();
streams.start();

Runtime.getRuntime().addShutdownHook(new  Thread(streams::close));

执行几分钟后,应用程序抛出以下异常,然后不再继续处理流。

[2017-02-22 14:24:35,139] ERROR [StreamThread-14] User provided listener org.apache.kafka.streams.processor.internals.StreamThread for group TRANSFORMATION-APP failed on partition assignment (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
org.apache.kafka.streams.errors.ProcessorStateException: task [0_11] Error while creating the state manager
    at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:72)
    at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:89)
    at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633)
    at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660)
    at org.apache.kafka.streams.processor.internals.StreamThread.access0(StreamThread.java:69)
    at org.apache.kafka.streams.processor.internals.StreamThread.onPartitionsAssigned(StreamThread.java:124)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
Caused by: java.io.IOException: task [0_11] Failed to lock the state directory: /tmp/kafka-streams/TRANSFORMATION-APP/0_11
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:101)
    at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69)
    ... 13 more

我试图清除 /tmp/kafka-streams/TRANSFORMATION-APP 目录并重新启动应用程序,但再次抛出相同的异常。我注意到的一件事是应用程序在转换所有积压消息之前工作正常,但在处理一些新消息后抛出异常!

有时还会抛出以下未捕获的异常。

[ERROR] 2017-02-22 12:40:54.804 [StreamThread-29] MyTransformer - UncaughtExceptionHandler task directory [/tmp/kafka-streams/TRANSFORMATION-APP/0_24] doesn't exist and couldn't be created

[ERROR] 2017-02-22 12:42:30.148 [StreamThread-179] MyTransformer - UncaughtExceptionHandler stream-thread [StreamThread-179] Failed 
to rebalance

抛出(其中一个)这些异常后,应用程序仍然 运行 但没有在流中前进。

处理这些错误的正确方法是什么?。是否可以在不终止应用程序的情况下以编程方式重新启动流?此应用在 monit 下。在最坏的情况下,我宁愿正确终止应用程序(没有任何消息丢失),以便 monit 可以重新启动它。

输入主题有 100 个分区,我在应用程序配置中将 num.stream.threads 设置为 100。该应用程序在 Kafka 0.10.1.1-cp1.

Kakfa 0.10.1.x 在多线程方面存在一些错误。您可以升级到 0.10.2(AK 今天发布,CP 3.2 应该很快就会发布)或者您应用以下解决方法:

  • 仅使用单线程执行
  • 如果您需要更多线程,请启动更多实例
  • 为每个实例配置不同的状态目录

您可能还需要在重新启动之前删除本地状态目录(仅一次)以进入整体一致的应用程序状态。

无论如何,都不会丢失数据。 Kafka Streams 保证至少一次处理语义,即使在失败的情况下也是如此。这也适用于您的本地商店——在您删除本地状态目录后,启动时将从底层的 Kafka 更新日志主题重新创建这些状态(尽管这是一项昂贵的操作)。

UncaughtExceptionHandler 只是为您提供了一种确定线程已死亡的方法。它不会(直接)帮助重新启动您的应用程序。要恢复死线程,您需要完全关闭 KafkaStreams 个实例并 create/start 个新实例。我们希望在未来增加对此更好的支持。

我知道很久以前就有人问过这个问题,但是 post 会更新有关新的 Kafka-Streams 功能的信息。由于 Kafka-Streams 2.8.0,您可以自动替换失败的流线程(由未捕获的异常引起) 使用 KafkaStreams 方法 void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler eh);StreamThreadExceptionResponse.REPLACE_THREAD。这样,失败的消息将在新的替换流上重新处理。 有关详细信息,请查看 Kafka Streams Specific Uncaught Exception Handler

kafkaStreams.setUncaughtExceptionHandler(ex -> {
    log.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);
    return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});

Kafka-Streams2.8.0之前,您可以自行实现重启失败的KafkaStreams的逻辑。思路是这样的:

KafkaStreams kafkaStreams = createYourKafkaStreams();
kafkaStreams.setStateListener(createErrorStateListener(sourceTopicName, kafkaStreams));

private KafkaStreams.StateListener createErrorStateListener(String sourceTopicName, KafkaStreams kafkaStreams) {
    return (newState, oldState) -> {
        if (newState == KafkaStreams.State.ERROR) {
            log.error("Kafka Stream is in ERROR state for source topic [{}]", sourceTopicName);
            replaceFailedKafkaStream(kafkaStreams, sourceTopicName);
        }
    };
}

// invoke this method either right after stream died, or by scheduling
private void replaceFailedKafkaStream(KafkaStreams kafkaStreams, String sourceTopicName) {
    kafkaStreams.close();
    KafkaStreams newKafkaStreams = createYourKafkaStreams();
    newKafkaStreams.setStateListener(createErrorStateListener(sourceTopicName, newKafkaStreams));
    newKafkaStreams.start();
}