处理 Kafka 流中的异常

Handling exceptions in Kafka streams

浏览了多个帖子,但其中大部分与处理错误消息相关,而不是处理它们时的异常处理。

我想知道如何处理流应用程序收到的消息,并且在处理消息时出现异常?异常可能是由于网络故障、RuntimeException 等多种原因造成的,

这取决于你想对生产者端的异常做什么。 如果 producer 抛出异常(例如由于网络故障或 kafka broker 挂掉),stream 将默认挂掉。对于 kafka-streams 版本 1.1.0,您可以通过实现 ProductionExceptionHandler 来覆盖默认行为,如下所示:

public class CustomProductionExceptionHandler implements ProductionExceptionHandler {

    @Override
    public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
                                                     final Exception exception) {
        log.error("Kafka message marked as processed although it failed. Message: [{}], destination topic: [{}]",  new String(record.value()), record.topic(), exception);
        return ProductionExceptionHandlerResponse.CONTINUE;
    }

    @Override
    public void configure(final Map<String, ?> configs) {
    }

}

从 handle 方法中,如果您不希望流因异常而死,您可以 return CONTINUE,如果您希望流停止,则 return FAIL (失败是默认的)。 你需要在流配置中指定这个 class:

default.production.exception.handler=com.example.CustomProductionExceptionHandler

还要注意ProductionExceptionHandler只处理producer上的异常,不会处理流方法mapValues(..)filter(..)branch(..)等处理消息时的异常, 你需要用 try / catch 块包装这些方法逻辑(将你所有的方法逻辑放入 try 块以保证你将处理所有异常情况):

.filter((key, value) -> { try {..} catch (Exception e) {..} })

据我所知,我们不需要显式处理消费者端的异常,因为 kafka 流将稍后自动重试消费(因为在消费和处理消息之前不会更改偏移量);例如如果 kafka broker 在一段时间内无法访问,你会从 kafka 流中得到异常,当中断时,kafka 流将消耗所有消息。所以在这种情况下,我们只会有延迟,什么都没有 corrupted/lost.

使用 setUncaughtExceptionHandler 您将无法像使用 ProductionExceptionHandler 那样更改默认行为,使用它您只能记录错误或将消息发送到失败主题。


更新自 kafka-streams 2.8.0

kafka-streams 2.8.0 以来,您可以自动替换失败的流线程(由未捕获的异常引起) 使用 KafkaStreams 方法 void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler eh);StreamThreadExceptionResponse.REPLACE_THREAD。有关详细信息,请查看 Kafka Streams Specific Uncaught Exception Handler

kafkaStreams.setUncaughtExceptionHandler(ex -> {
    log.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);
    return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});

对于消费者端的异常处理,

1) 您可以使用以下 属性.

在生产者中添加默认异常处理程序
"default.deserialization.exception.handler" = "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler";

基本上apache提供了三个异常处理器类 as

1) LogAndContiuneExceptionHandler 可以作为

props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, 
           LogAndContinueExceptionHandler.class);

2) LogAndFailExceptionHandler

props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, 
           LogAndFailExceptionHandler.class);

3) LogAndSkipOnInvalidTimestamp

props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, 
           LogAndSkipOnInvalidTimestamp.class);

对于自定义异常处理,

1)您可以实现DeserializationExceptionHandler接口并覆盖handle()方法。

2) 或者你可以扩展上面提到的 类.

setUncaughtExceptionHandler 无助于处理异常,它在流由于某些未捕获的异常而终止后起作用。

Kafka 提供了几种处理异常的方法。一个简单的 try-catch{} 将有助于捕获处理器代码中的异常,但 kafka 反序列化异常(可能是由于数据问题)和生产异常(在与代理通信期间发生)需要DeserializationExceptionHandlerProductionExceptionHandler 分别。默认情况下,kafka 应用程序如果遇到其中任何一个都会失败。

你可以在这个post

上找到

在 Spring 云流中,您使用以下方法配置自定义反序列化处理程序:

  • spring.cloud.stream.kafka.streams.binder.configuration.default.deserialization.exception.handler=你的包-name.CustomLogAndContinueExceptionHandler

  • CustomLogAndContinueExceptionHandler 扩展 LogAndContinueExceptionHandler 或实现 DeserializationExceptionHandler

  • CustomLogAndContinueExceptionHandler DeserializationHandlerResponse.CONTINUE 或 FAIL 取决于您的用例

@Slf4j
public class CustomLogAndContinueExceptionHandler extends LogAndContinueExceptionHandler {

    @Override
    public DeserializationHandlerResponse handle(ProcessorContext context, ConsumerRecord<byte[], byte[]> record,
            Exception exception) {
.... some business logic here ....
        log.error("Message failed: taskId: {}, topic: {}, partition: {}, offset: {}, , detailerror : {}",
                context.taskId(), record.topic(), record.partition(), record.offset(), exception.getMessage());
        return DeserializationHandlerResponse.CONTINUE;
    }
}