Spring Kafka Retry 是否在不同分区之间挑选消息?
Does Spring Kafka Retry pick messages amongst different partitions?
我想知道 Spring Kafka 如何处理分配给一个实例的多个分区的重试。 Spring Kafka 是根据重试策略和退避策略不断重试相同的消息,还是重试,在重试之间,是否从其他分区发送消息?
是行为:
A)重试消息->重试消息->重试消息
B)重试消息->其他消息->重试消息->重试消息
我查看了其他 Whosebug 问题,这些问题似乎证实了给定一个分区 Spring Kafka 不会移动到另一个偏移量,但是没有关于如果分配了多个分区会有什么行为的信息到实例。我已经实现了一个具有重试模板和有状态重试的工厂。
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
ListenerExceptions listenerExceptions = new ListenerExceptions();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(KafkaProperties.CONCURRENCY);
factory.getContainerProperties().setPollTimeout(KafkaProperties.POLL_TIMEOUT_VLAUE);
factory.setRetryTemplate(retryTemplate());
factory.setErrorHandler(new SeekToCurrentErrorHandler());
factory.setStatefulRetry(true);
factory.setRecoveryCallback((RetryContext context) -> listenerExceptions.recover(context));
return factory;
}
上述工厂的重试配置委托给RetryingMessageListenerAdapter
,逻辑是这样的:
public void onMessage(final ConsumerRecord<K, V> record, final Acknowledgment acknowledgment,
final Consumer<?, ?> consumer) {
RetryState retryState = null;
if (this.stateful) {
retryState = new DefaultRetryState(record.topic() + "-" + record.partition() + "-" + record.offset());
}
getRetryTemplate().execute(context -> {
context.setAttribute(CONTEXT_RECORD, record);
switch (RetryingMessageListenerAdapter.this.delegateType) {
case ACKNOWLEDGING_CONSUMER_AWARE:
context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment);
context.setAttribute(CONTEXT_CONSUMER, consumer);
RetryingMessageListenerAdapter.this.delegate.onMessage(record, acknowledgment, consumer);
break;
case ACKNOWLEDGING:
context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment);
RetryingMessageListenerAdapter.this.delegate.onMessage(record, acknowledgment);
break;
case CONSUMER_AWARE:
context.setAttribute(CONTEXT_CONSUMER, consumer);
RetryingMessageListenerAdapter.this.delegate.onMessage(record, consumer);
break;
case SIMPLE:
RetryingMessageListenerAdapter.this.delegate.onMessage(record);
}
return null;
},
getRecoveryCallback(), retryState);
}
因此,我们会重试每条消息。根据 Apache Kafka 的建议,我们在一个线程中处理一个分区,因此在重试耗尽或调用成功之前,不会处理该分区中的每条下一条记录。
根据您的多分区情况和factory.setConcurrency(KafkaProperties.CONCURRENCY);
配置,可能是不同的分区在不同的线程中处理。因此,可能会同时重试来自不同分区的不同记录。只是因为重试绑定到线程和调用堆栈。
我想知道 Spring Kafka 如何处理分配给一个实例的多个分区的重试。 Spring Kafka 是根据重试策略和退避策略不断重试相同的消息,还是重试,在重试之间,是否从其他分区发送消息?
是行为:
A)重试消息->重试消息->重试消息
B)重试消息->其他消息->重试消息->重试消息
我查看了其他 Whosebug 问题,这些问题似乎证实了给定一个分区 Spring Kafka 不会移动到另一个偏移量,但是没有关于如果分配了多个分区会有什么行为的信息到实例。我已经实现了一个具有重试模板和有状态重试的工厂。
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
ListenerExceptions listenerExceptions = new ListenerExceptions();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(KafkaProperties.CONCURRENCY);
factory.getContainerProperties().setPollTimeout(KafkaProperties.POLL_TIMEOUT_VLAUE);
factory.setRetryTemplate(retryTemplate());
factory.setErrorHandler(new SeekToCurrentErrorHandler());
factory.setStatefulRetry(true);
factory.setRecoveryCallback((RetryContext context) -> listenerExceptions.recover(context));
return factory;
}
上述工厂的重试配置委托给RetryingMessageListenerAdapter
,逻辑是这样的:
public void onMessage(final ConsumerRecord<K, V> record, final Acknowledgment acknowledgment,
final Consumer<?, ?> consumer) {
RetryState retryState = null;
if (this.stateful) {
retryState = new DefaultRetryState(record.topic() + "-" + record.partition() + "-" + record.offset());
}
getRetryTemplate().execute(context -> {
context.setAttribute(CONTEXT_RECORD, record);
switch (RetryingMessageListenerAdapter.this.delegateType) {
case ACKNOWLEDGING_CONSUMER_AWARE:
context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment);
context.setAttribute(CONTEXT_CONSUMER, consumer);
RetryingMessageListenerAdapter.this.delegate.onMessage(record, acknowledgment, consumer);
break;
case ACKNOWLEDGING:
context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment);
RetryingMessageListenerAdapter.this.delegate.onMessage(record, acknowledgment);
break;
case CONSUMER_AWARE:
context.setAttribute(CONTEXT_CONSUMER, consumer);
RetryingMessageListenerAdapter.this.delegate.onMessage(record, consumer);
break;
case SIMPLE:
RetryingMessageListenerAdapter.this.delegate.onMessage(record);
}
return null;
},
getRecoveryCallback(), retryState);
}
因此,我们会重试每条消息。根据 Apache Kafka 的建议,我们在一个线程中处理一个分区,因此在重试耗尽或调用成功之前,不会处理该分区中的每条下一条记录。
根据您的多分区情况和factory.setConcurrency(KafkaProperties.CONCURRENCY);
配置,可能是不同的分区在不同的线程中处理。因此,可能会同时重试来自不同分区的不同记录。只是因为重试绑定到线程和调用堆栈。