RetryTemplate 与 ErrorHandler 的性能比较

Performance comparison of RetryTemplate vs ErrorHandler

我有两种方法可以使用

从 KafkaListener 错误中恢复
  1. KafkaListener 方法中的 RetryTemplate:
@KafkaListener(topics: "topic1")
public void handle(command) {
    retryTemplate.execute(ctx -> {
        processCommand(command);
    });
    // Retries exhausted, execute the recoverer logic
    recover(command);
}
  1. 通过 ContainerCustomizer 将 ErrorHandler 设置为 MessageListenerContainer:
@Component
    public class ContainerCustomizer {
        public ContainerCustomizer(CustomConcurrentContainerListenerFactory factory) {
            factory.setContainerCustomizer(container -> {
                    container.setErrorHandler(new SeekToCurrentErrorHandler((ConsumerRecord<?, ?> record, Exception e) -> {
                        //logic for recoverer after retries exhausted
                        recover(convertRecord(record));
                    }, new ExponentialBackOffWithMaxRetries(2)));
            });
        }
    }

当谈到性能和阻塞消费者线程时,这两个选项如何比较? RetryTemplate.execute 的重试是在单独的线程中处理的,而 containerListener.setErrorHandler 会阻塞主消费者的线程,这是真的吗?

两者都将阻塞消费者线程 - 否则您将继续处理记录并且 Kafka 的排序保证将丢失。

此外,这两种方法都已弃用,取而代之的是 DefaultErrorHandler,这是 SeekToCurrentErrorHandler 的演变。

两者的区别在于,使用 Spring Retry,所有调用都将在内存中重试,因此您应确保聚合退避不会超过 max.poll.interval.ms,否则代理会认为您的服务器已死,将执行重新平衡。

SeekToCurrentErrorHandler,以及DefaultErrorHandler,每次重试都会向broker执行新的seek,所以你只需要确保最大延迟加执行时间不超过max.poll.interval.ms.

如果您需要 non-blocking retries,请查看 non-blocking retries 功能。