Spring 卡夫卡 |如何使 DeserializationException 可重试?

Spring Kafka | How to make DeserializationException retryable?

我有一个以 Avro 格式发送消息的生产者和一个收听这些消息的消费者。

我还通过在我的消费者中使用 @RetryableTopic 来处理错误来实现非阻塞重试。

当消费者无法反序列化消息时(由于架构更改或其他原因),它不会将该消息放入 -retry 主题中。它直接将其发送到 -dlt 主题。

我也希望重试 DeserializationExceptions。原因是在重试这些错误时,我可以在我的消费者中部署一个修复程序,以便重试最终能够成功。

我在 @RetryableTopic 中尝试了 include 选项,但它似乎不适用于 DeserializationException

  @RetryableTopic(
    attempts = "${app.consumer.retry.topic.count:5}",
    backoff = @Backoff(delayExpression = "${app.consumer.retry.topic.back-off:2000}"),
    fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC,
    include = {DeserializationException.class}  // does not work
  )

这是 @RetryableTopic 中的错误还是有其他方法可以实现?

因为 Spring Kafka 2.8.3 有一组 global fatal exceptions,正如您所描述的,将导致记录直接转发到 DLT

处理此类异常的通常模式是,在部署修复程序后,使用某种控制台应用程序从 DLT 检索失败的记录并重新处理它,可能是通过发送记录回到第一个重试主题,这样主题中就没有重复了。

对于您描述的模式,您可以通过提供 DestinationTopicResolver bean 来管理这组全局 FATAL 异常,例如:

@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
public DestinationTopicResolver topicResolver(ApplicationContext applicationContext) {
    DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(Clock.systemUTC(), applicationContext);
    ddtr.removeClassification(DeserializationException.class);
    return ddtr;
}

请告诉我这是否适合您。谢谢。

以下是我们是如何实现的:

  @Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
  public DestinationTopicResolver destinationTopicResolver(ApplicationContext context) {
    DefaultDestinationTopicResolver resolver = new DefaultDestinationTopicResolver(systemUTC(), context);
    resolver.setClassifications(emptyMap(), true);
    return resolver;
  }

这样我们就不必一一指定要包含的每个异常。另一种解决方案是 Tomaz 所建议的:

  @Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
  public DestinationTopicResolver topicResolver(ApplicationContext applicationContext) {
    DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(systemUTC(), applicationContext);
    ddtr.removeClassification(DeserializationException.class);
    ddtr.removeClassification(ClassCastException.class);
    return ddtr;
  }