处理 Kafka 流中的异常
Handling exceptions in Kafka streams
浏览了多个帖子,但其中大部分与处理错误消息相关,而不是处理它们时的异常处理。
我想知道如何处理流应用程序收到的消息,并且在处理消息时出现异常?异常可能是由于网络故障、RuntimeException 等多种原因造成的,
- 有人可以建议正确的做法吗?我应该使用
setUncaughtExceptionHandler
?或者有更好的方法吗?
- 如何处理重试?
这取决于你想对生产者端的异常做什么。
如果 producer 抛出异常(例如由于网络故障或 kafka broker 挂掉),stream 将默认挂掉。对于 kafka-streams 版本 1.1.0,您可以通过实现 ProductionExceptionHandler
来覆盖默认行为,如下所示:
public class CustomProductionExceptionHandler implements ProductionExceptionHandler {
@Override
public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
log.error("Kafka message marked as processed although it failed. Message: [{}], destination topic: [{}]", new String(record.value()), record.topic(), exception);
return ProductionExceptionHandlerResponse.CONTINUE;
}
@Override
public void configure(final Map<String, ?> configs) {
}
}
从 handle 方法中,如果您不希望流因异常而死,您可以 return CONTINUE
,如果您希望流停止,则 return FAIL
(失败是默认的)。
你需要在流配置中指定这个 class:
default.production.exception.handler=com.example.CustomProductionExceptionHandler
还要注意ProductionExceptionHandler
只处理producer上的异常,不会处理流方法mapValues(..)
、filter(..)
、branch(..)
等处理消息时的异常, 你需要用 try / catch 块包装这些方法逻辑(将你所有的方法逻辑放入 try 块以保证你将处理所有异常情况):
.filter((key, value) -> { try {..} catch (Exception e) {..} })
据我所知,我们不需要显式处理消费者端的异常,因为 kafka 流将稍后自动重试消费(因为在消费和处理消息之前不会更改偏移量);例如如果 kafka broker 在一段时间内无法访问,你会从 kafka 流中得到异常,当中断时,kafka 流将消耗所有消息。所以在这种情况下,我们只会有延迟,什么都没有 corrupted/lost.
使用 setUncaughtExceptionHandler
您将无法像使用 ProductionExceptionHandler
那样更改默认行为,使用它您只能记录错误或将消息发送到失败主题。
更新自 kafka-streams
2.8.0
自 kafka-streams
2.8.0
以来,您可以自动替换失败的流线程(由未捕获的异常引起)
使用 KafkaStreams
方法 void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler eh);
和 StreamThreadExceptionResponse.REPLACE_THREAD
。有关详细信息,请查看 Kafka Streams Specific Uncaught Exception Handler
kafkaStreams.setUncaughtExceptionHandler(ex -> {
log.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});
对于消费者端的异常处理,
1) 您可以使用以下 属性.
在生产者中添加默认异常处理程序
"default.deserialization.exception.handler" = "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler";
基本上apache提供了三个异常处理器类 as
1) LogAndContiuneExceptionHandler 可以作为
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
2) LogAndFailExceptionHandler
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndFailExceptionHandler.class);
3) LogAndSkipOnInvalidTimestamp
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndSkipOnInvalidTimestamp.class);
对于自定义异常处理,
1)您可以实现DeserializationExceptionHandler接口并覆盖handle()方法。
2) 或者你可以扩展上面提到的 类.
setUncaughtExceptionHandler 无助于处理异常,它在流由于某些未捕获的异常而终止后起作用。
Kafka 提供了几种处理异常的方法。一个简单的 try-catch{} 将有助于捕获处理器代码中的异常,但 kafka 反序列化异常(可能是由于数据问题)和生产异常(在与代理通信期间发生)需要DeserializationExceptionHandler 和 ProductionExceptionHandler 分别。默认情况下,kafka 应用程序如果遇到其中任何一个都会失败。
你可以在这个post
上找到
在 Spring 云流中,您使用以下方法配置自定义反序列化处理程序:
spring.cloud.stream.kafka.streams.binder.configuration.default.deserialization.exception.handler=你的包-name.CustomLogAndContinueExceptionHandler
CustomLogAndContinueExceptionHandler 扩展 LogAndContinueExceptionHandler 或实现 DeserializationExceptionHandler
CustomLogAndContinueExceptionHandler DeserializationHandlerResponse.CONTINUE 或 FAIL 取决于您的用例
@Slf4j
public class CustomLogAndContinueExceptionHandler extends LogAndContinueExceptionHandler {
@Override
public DeserializationHandlerResponse handle(ProcessorContext context, ConsumerRecord<byte[], byte[]> record,
Exception exception) {
.... some business logic here ....
log.error("Message failed: taskId: {}, topic: {}, partition: {}, offset: {}, , detailerror : {}",
context.taskId(), record.topic(), record.partition(), record.offset(), exception.getMessage());
return DeserializationHandlerResponse.CONTINUE;
}
}
浏览了多个帖子,但其中大部分与处理错误消息相关,而不是处理它们时的异常处理。
我想知道如何处理流应用程序收到的消息,并且在处理消息时出现异常?异常可能是由于网络故障、RuntimeException 等多种原因造成的,
- 有人可以建议正确的做法吗?我应该使用
setUncaughtExceptionHandler
?或者有更好的方法吗? - 如何处理重试?
这取决于你想对生产者端的异常做什么。
如果 producer 抛出异常(例如由于网络故障或 kafka broker 挂掉),stream 将默认挂掉。对于 kafka-streams 版本 1.1.0,您可以通过实现 ProductionExceptionHandler
来覆盖默认行为,如下所示:
public class CustomProductionExceptionHandler implements ProductionExceptionHandler {
@Override
public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
log.error("Kafka message marked as processed although it failed. Message: [{}], destination topic: [{}]", new String(record.value()), record.topic(), exception);
return ProductionExceptionHandlerResponse.CONTINUE;
}
@Override
public void configure(final Map<String, ?> configs) {
}
}
从 handle 方法中,如果您不希望流因异常而死,您可以 return CONTINUE
,如果您希望流停止,则 return FAIL
(失败是默认的)。
你需要在流配置中指定这个 class:
default.production.exception.handler=com.example.CustomProductionExceptionHandler
还要注意ProductionExceptionHandler
只处理producer上的异常,不会处理流方法mapValues(..)
、filter(..)
、branch(..)
等处理消息时的异常, 你需要用 try / catch 块包装这些方法逻辑(将你所有的方法逻辑放入 try 块以保证你将处理所有异常情况):
.filter((key, value) -> { try {..} catch (Exception e) {..} })
据我所知,我们不需要显式处理消费者端的异常,因为 kafka 流将稍后自动重试消费(因为在消费和处理消息之前不会更改偏移量);例如如果 kafka broker 在一段时间内无法访问,你会从 kafka 流中得到异常,当中断时,kafka 流将消耗所有消息。所以在这种情况下,我们只会有延迟,什么都没有 corrupted/lost.
使用 setUncaughtExceptionHandler
您将无法像使用 ProductionExceptionHandler
那样更改默认行为,使用它您只能记录错误或将消息发送到失败主题。
更新自 kafka-streams
2.8.0
自 kafka-streams
2.8.0
以来,您可以自动替换失败的流线程(由未捕获的异常引起)
使用 KafkaStreams
方法 void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler eh);
和 StreamThreadExceptionResponse.REPLACE_THREAD
。有关详细信息,请查看 Kafka Streams Specific Uncaught Exception Handler
kafkaStreams.setUncaughtExceptionHandler(ex -> {
log.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});
对于消费者端的异常处理,
1) 您可以使用以下 属性.
在生产者中添加默认异常处理程序"default.deserialization.exception.handler" = "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler";
基本上apache提供了三个异常处理器类 as
1) LogAndContiuneExceptionHandler 可以作为
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
2) LogAndFailExceptionHandler
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndFailExceptionHandler.class);
3) LogAndSkipOnInvalidTimestamp
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndSkipOnInvalidTimestamp.class);
对于自定义异常处理,
1)您可以实现DeserializationExceptionHandler接口并覆盖handle()方法。
2) 或者你可以扩展上面提到的 类.
setUncaughtExceptionHandler 无助于处理异常,它在流由于某些未捕获的异常而终止后起作用。
Kafka 提供了几种处理异常的方法。一个简单的 try-catch{} 将有助于捕获处理器代码中的异常,但 kafka 反序列化异常(可能是由于数据问题)和生产异常(在与代理通信期间发生)需要DeserializationExceptionHandler 和 ProductionExceptionHandler 分别。默认情况下,kafka 应用程序如果遇到其中任何一个都会失败。
你可以在这个post
上找到在 Spring 云流中,您使用以下方法配置自定义反序列化处理程序:
spring.cloud.stream.kafka.streams.binder.configuration.default.deserialization.exception.handler=你的包-name.CustomLogAndContinueExceptionHandler
CustomLogAndContinueExceptionHandler 扩展 LogAndContinueExceptionHandler 或实现 DeserializationExceptionHandler
CustomLogAndContinueExceptionHandler DeserializationHandlerResponse.CONTINUE 或 FAIL 取决于您的用例
@Slf4j
public class CustomLogAndContinueExceptionHandler extends LogAndContinueExceptionHandler {
@Override
public DeserializationHandlerResponse handle(ProcessorContext context, ConsumerRecord<byte[], byte[]> record,
Exception exception) {
.... some business logic here ....
log.error("Message failed: taskId: {}, topic: {}, partition: {}, offset: {}, , detailerror : {}",
context.taskId(), record.topic(), record.partition(), record.offset(), exception.getMessage());
return DeserializationHandlerResponse.CONTINUE;
}
}