RetryTemplate 与 ErrorHandler 的性能比较
Performance comparison of RetryTemplate vs ErrorHandler
我有两种方法可以使用
从 KafkaListener 错误中恢复
KafkaListener
方法中的 RetryTemplate:
@KafkaListener(topics: "topic1")
public void handle(command) {
retryTemplate.execute(ctx -> {
processCommand(command);
});
// Retries exhausted, execute the recoverer logic
recover(command);
}
- 通过 ContainerCustomizer 将 ErrorHandler 设置为 MessageListenerContainer:
@Component
public class ContainerCustomizer {
public ContainerCustomizer(CustomConcurrentContainerListenerFactory factory) {
factory.setContainerCustomizer(container -> {
container.setErrorHandler(new SeekToCurrentErrorHandler((ConsumerRecord<?, ?> record, Exception e) -> {
//logic for recoverer after retries exhausted
recover(convertRecord(record));
}, new ExponentialBackOffWithMaxRetries(2)));
});
}
}
当谈到性能和阻塞消费者线程时,这两个选项如何比较? RetryTemplate.execute
的重试是在单独的线程中处理的,而 containerListener.setErrorHandler
会阻塞主消费者的线程,这是真的吗?
两者都将阻塞消费者线程 - 否则您将继续处理记录并且 Kafka 的排序保证将丢失。
此外,这两种方法都已弃用,取而代之的是 DefaultErrorHandler
,这是 SeekToCurrentErrorHandler
的演变。
两者的区别在于,使用 Spring Retry
,所有调用都将在内存中重试,因此您应确保聚合退避不会超过 max.poll.interval.ms
,否则代理会认为您的服务器已死,将执行重新平衡。
SeekToCurrentErrorHandler
,以及DefaultErrorHandler
,每次重试都会向broker执行新的seek,所以你只需要确保最大延迟加执行时间不超过max.poll.interval.ms
.
如果您需要 non-blocking retries
,请查看 non-blocking retries 功能。
我有两种方法可以使用
从 KafkaListener 错误中恢复KafkaListener
方法中的 RetryTemplate:
@KafkaListener(topics: "topic1")
public void handle(command) {
retryTemplate.execute(ctx -> {
processCommand(command);
});
// Retries exhausted, execute the recoverer logic
recover(command);
}
- 通过 ContainerCustomizer 将 ErrorHandler 设置为 MessageListenerContainer:
@Component
public class ContainerCustomizer {
public ContainerCustomizer(CustomConcurrentContainerListenerFactory factory) {
factory.setContainerCustomizer(container -> {
container.setErrorHandler(new SeekToCurrentErrorHandler((ConsumerRecord<?, ?> record, Exception e) -> {
//logic for recoverer after retries exhausted
recover(convertRecord(record));
}, new ExponentialBackOffWithMaxRetries(2)));
});
}
}
当谈到性能和阻塞消费者线程时,这两个选项如何比较? RetryTemplate.execute
的重试是在单独的线程中处理的,而 containerListener.setErrorHandler
会阻塞主消费者的线程,这是真的吗?
两者都将阻塞消费者线程 - 否则您将继续处理记录并且 Kafka 的排序保证将丢失。
此外,这两种方法都已弃用,取而代之的是 DefaultErrorHandler
,这是 SeekToCurrentErrorHandler
的演变。
两者的区别在于,使用 Spring Retry
,所有调用都将在内存中重试,因此您应确保聚合退避不会超过 max.poll.interval.ms
,否则代理会认为您的服务器已死,将执行重新平衡。
SeekToCurrentErrorHandler
,以及DefaultErrorHandler
,每次重试都会向broker执行新的seek,所以你只需要确保最大延迟加执行时间不超过max.poll.interval.ms
.
如果您需要 non-blocking retries
,请查看 non-blocking retries 功能。