Spring 卡夫卡 |如何使 DeserializationException 可重试?
Spring Kafka | How to make DeserializationException retryable?
我有一个以 Avro 格式发送消息的生产者和一个收听这些消息的消费者。
我还通过在我的消费者中使用 @RetryableTopic
来处理错误来实现非阻塞重试。
当消费者无法反序列化消息时(由于架构更改或其他原因),它不会将该消息放入 -retry
主题中。它直接将其发送到 -dlt
主题。
我也希望重试 DeserializationException
s。原因是在重试这些错误时,我可以在我的消费者中部署一个修复程序,以便重试最终能够成功。
我在 @RetryableTopic
中尝试了 include
选项,但它似乎不适用于 DeserializationException
。
@RetryableTopic(
attempts = "${app.consumer.retry.topic.count:5}",
backoff = @Backoff(delayExpression = "${app.consumer.retry.topic.back-off:2000}"),
fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC,
include = {DeserializationException.class} // does not work
)
这是 @RetryableTopic
中的错误还是有其他方法可以实现?
因为 Spring Kafka 2.8.3
有一组 global fatal exceptions,正如您所描述的,将导致记录直接转发到 DLT
。
处理此类异常的通常模式是,在部署修复程序后,使用某种控制台应用程序从 DLT
检索失败的记录并重新处理它,可能是通过发送记录回到第一个重试主题,这样主题中就没有重复了。
对于您描述的模式,您可以通过提供 DestinationTopicResolver
bean 来管理这组全局 FATAL
异常,例如:
@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
public DestinationTopicResolver topicResolver(ApplicationContext applicationContext) {
DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(Clock.systemUTC(), applicationContext);
ddtr.removeClassification(DeserializationException.class);
return ddtr;
}
请告诉我这是否适合您。谢谢。
以下是我们是如何实现的:
@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
public DestinationTopicResolver destinationTopicResolver(ApplicationContext context) {
DefaultDestinationTopicResolver resolver = new DefaultDestinationTopicResolver(systemUTC(), context);
resolver.setClassifications(emptyMap(), true);
return resolver;
}
这样我们就不必一一指定要包含的每个异常。另一种解决方案是 Tomaz 所建议的:
@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
public DestinationTopicResolver topicResolver(ApplicationContext applicationContext) {
DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(systemUTC(), applicationContext);
ddtr.removeClassification(DeserializationException.class);
ddtr.removeClassification(ClassCastException.class);
return ddtr;
}
我有一个以 Avro 格式发送消息的生产者和一个收听这些消息的消费者。
我还通过在我的消费者中使用 @RetryableTopic
来处理错误来实现非阻塞重试。
当消费者无法反序列化消息时(由于架构更改或其他原因),它不会将该消息放入 -retry
主题中。它直接将其发送到 -dlt
主题。
我也希望重试 DeserializationException
s。原因是在重试这些错误时,我可以在我的消费者中部署一个修复程序,以便重试最终能够成功。
我在 @RetryableTopic
中尝试了 include
选项,但它似乎不适用于 DeserializationException
。
@RetryableTopic(
attempts = "${app.consumer.retry.topic.count:5}",
backoff = @Backoff(delayExpression = "${app.consumer.retry.topic.back-off:2000}"),
fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC,
include = {DeserializationException.class} // does not work
)
这是 @RetryableTopic
中的错误还是有其他方法可以实现?
因为 Spring Kafka 2.8.3
有一组 global fatal exceptions,正如您所描述的,将导致记录直接转发到 DLT
。
处理此类异常的通常模式是,在部署修复程序后,使用某种控制台应用程序从 DLT
检索失败的记录并重新处理它,可能是通过发送记录回到第一个重试主题,这样主题中就没有重复了。
对于您描述的模式,您可以通过提供 DestinationTopicResolver
bean 来管理这组全局 FATAL
异常,例如:
@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
public DestinationTopicResolver topicResolver(ApplicationContext applicationContext) {
DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(Clock.systemUTC(), applicationContext);
ddtr.removeClassification(DeserializationException.class);
return ddtr;
}
请告诉我这是否适合您。谢谢。
以下是我们是如何实现的:
@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
public DestinationTopicResolver destinationTopicResolver(ApplicationContext context) {
DefaultDestinationTopicResolver resolver = new DefaultDestinationTopicResolver(systemUTC(), context);
resolver.setClassifications(emptyMap(), true);
return resolver;
}
这样我们就不必一一指定要包含的每个异常。另一种解决方案是 Tomaz 所建议的:
@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
public DestinationTopicResolver topicResolver(ApplicationContext applicationContext) {
DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(systemUTC(), applicationContext);
ddtr.removeClassification(DeserializationException.class);
ddtr.removeClassification(ClassCastException.class);
return ddtr;
}