当下游系统出现异常时,我们如何暂停 Kafka 消费者 polling/processing 记录

How can we pause Kafka consumer polling/processing records when there is an exception because of downstream system

我正在使用 spring boot 2.1.7.RELEASE 和 spring-kafka 2.2.8.RELEASE.And 我正在使用 @KafkaListener 注释创建一个消费者,我我正在使用消费者的所有默认设置。

现在,在我的消费者中,处理逻辑包括一个数据库调用,如果在处理过程中有 error/exception,我会将记录发送到 DLT。

使用此设置,如果数据库由于某种原因停机几分钟,我希望 pause/stop 我的消费者不要使用更多记录,否则它会继续使用消息并获得数据库异常,并且最终填满我不想做的 DLT,除非数据库返回(基于一些健康检查)。

现在我有几个问题了。

  1. spring-kafka 是否提供基于异常类型触发无限重试的选项(在这种情况下是数据库异常,但我想根据我的消费者逻辑添加更多类型的异常)
    1. spring-kafka 是否提供了根据条件触发消息消费的选项?

有一个 ContainerStoppingErrorHandler 但它会停止所有异常的容器。

您需要创建一个自定义错误处理程序来在特定故障后停止(或暂停)容器,以及一些重新启动(或恢复)容器的机制。