在 Kafka Stream 中处理消息时发生错误时重新处理消息

Reprocessing a message when an error occurs while processing it in the Kafka Stream

我有一个基于 Kafka Streams 的简单 Spring 应用程序,它使用来自传入主题的消息,进行 map 转换并打印此消息。 KStream这样配置

@Bean
public KStream<?, ?> processingPipeline(StreamsBuilder builder, MyTransformer myTransformer,
         PrintAction printAction, String topicName) {
    KStream<String, JsonNode> source = builder.stream(topicName,
                Consumed.with(Serdes.String(), new JsonSerde<>(JsonNode.class)));
    // @formatter:off
    source
        .map(myTransformer)
        .foreach(printAction);
    // @formatter:on
    return source;
}

内部 MyTransformer 我正在调用外部微服务,此时可能会关闭。如果调用失败(通常抛出 RuntimeException),我将无法进行转换。

这里的问题是,如果在之前的处理过程中发生任何错误,有什么方法可以再次在 Streams 应用程序中重新处理消息?

根据我目前的研究,这里没有办法这样做,我唯一的可能就是将消息推送到死信主题中,如果它再次失败,我会尝试在将来处理它我再次将它推送到 DLT并以这种方式重试。

如果在 Kafka Streams 处理期间发生任何未捕获的异常,您的流将状态更改为 ERROR 并停止使用发生错误的分区的传入消息。 您需要自己捕获异常。可以通过以下方式实现重试:1) 使用 Spring RetryTemplate 调用外部微服务(但请记住,您将延迟使用来自特定分区的消息),或 2) 将失败的消息推送到另一个微服务稍后重新处理的主题(如您所建议)


更新自 kafka-streams 2.8.0

kafka-streams 2.8.0 以来,您可以自动替换失败的流线程(由未捕获的异常引起) 使用 KafkaStreams 方法 void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler eh);StreamThreadExceptionResponse.REPLACE_THREAD。有关详细信息,请查看 Kafka Streams Specific Uncaught Exception Handler

kafkaStreams.setUncaughtExceptionHandler(ex -> {
    log.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);
    return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});