在 Kafka Stream 中处理消息时发生错误时重新处理消息
Reprocessing a message when an error occurs while processing it in the Kafka Stream
我有一个基于 Kafka Streams 的简单 Spring 应用程序,它使用来自传入主题的消息,进行 map
转换并打印此消息。 KStream
这样配置
@Bean
public KStream<?, ?> processingPipeline(StreamsBuilder builder, MyTransformer myTransformer,
PrintAction printAction, String topicName) {
KStream<String, JsonNode> source = builder.stream(topicName,
Consumed.with(Serdes.String(), new JsonSerde<>(JsonNode.class)));
// @formatter:off
source
.map(myTransformer)
.foreach(printAction);
// @formatter:on
return source;
}
内部 MyTransformer
我正在调用外部微服务,此时可能会关闭。如果调用失败(通常抛出 RuntimeException
),我将无法进行转换。
这里的问题是,如果在之前的处理过程中发生任何错误,有什么方法可以再次在 Streams 应用程序中重新处理消息?
根据我目前的研究,这里没有办法这样做,我唯一的可能就是将消息推送到死信主题中,如果它再次失败,我会尝试在将来处理它我再次将它推送到 DLT并以这种方式重试。
如果在 Kafka Streams 处理期间发生任何未捕获的异常,您的流将状态更改为 ERROR 并停止使用发生错误的分区的传入消息。
您需要自己捕获异常。可以通过以下方式实现重试:1) 使用 Spring RetryTemplate
调用外部微服务(但请记住,您将延迟使用来自特定分区的消息),或 2) 将失败的消息推送到另一个微服务稍后重新处理的主题(如您所建议)
更新自 kafka-streams
2.8.0
自 kafka-streams
2.8.0
以来,您可以自动替换失败的流线程(由未捕获的异常引起)
使用 KafkaStreams
方法 void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler eh);
和 StreamThreadExceptionResponse.REPLACE_THREAD
。有关详细信息,请查看 Kafka Streams Specific Uncaught Exception Handler
kafkaStreams.setUncaughtExceptionHandler(ex -> {
log.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});
我有一个基于 Kafka Streams 的简单 Spring 应用程序,它使用来自传入主题的消息,进行 map
转换并打印此消息。 KStream
这样配置
@Bean
public KStream<?, ?> processingPipeline(StreamsBuilder builder, MyTransformer myTransformer,
PrintAction printAction, String topicName) {
KStream<String, JsonNode> source = builder.stream(topicName,
Consumed.with(Serdes.String(), new JsonSerde<>(JsonNode.class)));
// @formatter:off
source
.map(myTransformer)
.foreach(printAction);
// @formatter:on
return source;
}
内部 MyTransformer
我正在调用外部微服务,此时可能会关闭。如果调用失败(通常抛出 RuntimeException
),我将无法进行转换。
这里的问题是,如果在之前的处理过程中发生任何错误,有什么方法可以再次在 Streams 应用程序中重新处理消息?
根据我目前的研究,这里没有办法这样做,我唯一的可能就是将消息推送到死信主题中,如果它再次失败,我会尝试在将来处理它我再次将它推送到 DLT并以这种方式重试。
如果在 Kafka Streams 处理期间发生任何未捕获的异常,您的流将状态更改为 ERROR 并停止使用发生错误的分区的传入消息。
您需要自己捕获异常。可以通过以下方式实现重试:1) 使用 Spring RetryTemplate
调用外部微服务(但请记住,您将延迟使用来自特定分区的消息),或 2) 将失败的消息推送到另一个微服务稍后重新处理的主题(如您所建议)
更新自 kafka-streams
2.8.0
自 kafka-streams
2.8.0
以来,您可以自动替换失败的流线程(由未捕获的异常引起)
使用 KafkaStreams
方法 void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler eh);
和 StreamThreadExceptionResponse.REPLACE_THREAD
。有关详细信息,请查看 Kafka Streams Specific Uncaught Exception Handler
kafkaStreams.setUncaughtExceptionHandler(ex -> {
log.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});