使用 spring-kafka 的消息顺序保证的指数退避
Exponential backoff with message order guarantee using spring-kafka
我正在尝试实现一个 Spring 基于引导的 Kafka 消费者,它有一些非常强大的消息传递保证,即使在出现错误的情况下也是如此。
- 来自分区的消息必须按顺序处理,
- 如果消息处理失败,应该暂停特定分区的消费,
- 应该通过退避重试处理,直到成功。
我们当前的实施满足这些要求:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRetryTemplate(retryTemplate());
final ContainerProperties containerProperties = factory.getContainerProperties();
containerProperties.setAckMode(AckMode.MANUAL_IMMEDIATE);
containerProperties.setErrorHandler(errorHandler());
return factory;
}
@Bean
public RetryTemplate retryTemplate() {
final ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000);
backOffPolicy.setMultiplier(1.5);
final RetryTemplate template = new RetryTemplate();
template.setRetryPolicy(new AlwaysRetryPolicy());
template.setBackOffPolicy(backOffPolicy);
return template;
}
@Bean
public ErrorHandler errorHandler() {
return new SeekToCurrentErrorHandler();
}
但是,在这里,记录被消费者永远锁定。在某些时候,处理时间将超过 max.poll.interval.ms
,服务器会将分区重新分配给其他消费者,从而创建一个副本。
假设 max.poll.interval.ms
等于 5 分钟(默认)并且失败持续 30 分钟,这将导致消息被处理大约 10 分钟。 6次。
另一种可能性是 return 在 N 次重试(例如 3 次尝试)后,通过使用 SimpleRetryPolicy
将消息发送到队列。然后,消息将被重播(感谢 SeekToCurrentErrorHandler
)并且处理将从头开始,最多尝试 5 次。这会导致延迟形成一个系列,例如
10 secs -> 30 secs -> 90 secs -> 10 secs -> 30 secs -> 90 secs -> ...
这比不断上升的更不理想:)
是否有任何第三种情况可以使延迟形成一个递增的序列,同时不会在上述示例中产生重复项?
可以通过有状态重试来完成 - 在这种情况下,每次重试后都会抛出异常,但状态会保持在重试状态 object,因此该消息的下一次传递将使用下一次延迟等等
这需要消息中的某些内容(例如 header)来唯一标识每条消息。幸运的是,对于 Kafka,主题、分区和偏移量为状态提供了唯一键。
但是,目前 RetryingMessageListenerAdapter
不支持状态重试。
您可以在侦听器容器工厂中禁用重试并在侦听器中使用有状态 RetryTemplate
,使用采用 RetryState
参数的 execute
方法之一。
欢迎为框架添加一个 GitHub 问题以支持状态重试;欢迎投稿! - pull request issued.
编辑
我刚刚编写了一个测试用例来演示如何使用状态恢复 @KafkaListener
...
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.kafka.annotation;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.retry.RetryState;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.support.DefaultRetryState;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @author Gary Russell
* @since 5.0
*
*/
@RunWith(SpringRunner.class)
@DirtiesContext
public class StatefulRetryTests {
private static final String DEFAULT_TEST_GROUP_ID = "statefulRetry";
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 1, "sr1");
@Autowired
private Config config;
@Autowired
private KafkaTemplate<Integer, String> template;
@Test
public void testStatefulRetry() throws Exception {
this.template.send("sr1", "foo");
assertThat(this.config.listener1().latch1.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(this.config.listener1().latch2.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(this.config.listener1().result).isTrue();
}
@Configuration
@EnableKafka
public static class Config {
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler());
return factory;
}
@Bean
public DefaultKafkaConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> consumerProps =
KafkaTestUtils.consumerProps(DEFAULT_TEST_GROUP_ID, "false", embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return consumerProps;
}
@Bean
public KafkaTemplate<Integer, String> template() {
KafkaTemplate<Integer, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
return kafkaTemplate;
}
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
return KafkaTestUtils.producerProps(embeddedKafka);
}
@Bean
public Listener listener1() {
return new Listener();
}
}
public static class Listener {
private static final RetryTemplate retryTemplate = new RetryTemplate();
private static final ConcurrentMap<String, RetryState> states = new ConcurrentHashMap<>();
static {
ExponentialBackOffPolicy backOff = new ExponentialBackOffPolicy();
retryTemplate.setBackOffPolicy(backOff);
}
private final CountDownLatch latch1 = new CountDownLatch(3);
private final CountDownLatch latch2 = new CountDownLatch(1);
private volatile boolean result;
@KafkaListener(topics = "sr1", groupId = "sr1")
public void listen1(final String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
String recordKey = topic + partition + offset;
RetryState retryState = states.get(recordKey);
if (retryState == null) {
retryState = new DefaultRetryState(recordKey);
states.put(recordKey, retryState);
}
this.result = retryTemplate.execute(c -> {
// do your work here
this.latch1.countDown();
throw new RuntimeException("retry");
}, c -> {
latch2.countDown();
return true;
}, retryState);
states.remove(recordKey);
}
}
}
和
Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void org.springframework.kafka.annotation.StatefulRetryTests$Listener.listen1(java.lang.String,java.lang.String,int,long)' threw exception; nested exception is java.lang.RuntimeException: retry
每次投递尝试后。
在这种情况下,我添加了一个恢复器来处理重试耗尽后的消息。你可以做其他事情,比如停止容器(但是在一个单独的线程上做,就像我们在 ContainerStoppingErrorHandler
中做的那样)。
我正在尝试实现一个 Spring 基于引导的 Kafka 消费者,它有一些非常强大的消息传递保证,即使在出现错误的情况下也是如此。
- 来自分区的消息必须按顺序处理,
- 如果消息处理失败,应该暂停特定分区的消费,
- 应该通过退避重试处理,直到成功。
我们当前的实施满足这些要求:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRetryTemplate(retryTemplate());
final ContainerProperties containerProperties = factory.getContainerProperties();
containerProperties.setAckMode(AckMode.MANUAL_IMMEDIATE);
containerProperties.setErrorHandler(errorHandler());
return factory;
}
@Bean
public RetryTemplate retryTemplate() {
final ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000);
backOffPolicy.setMultiplier(1.5);
final RetryTemplate template = new RetryTemplate();
template.setRetryPolicy(new AlwaysRetryPolicy());
template.setBackOffPolicy(backOffPolicy);
return template;
}
@Bean
public ErrorHandler errorHandler() {
return new SeekToCurrentErrorHandler();
}
但是,在这里,记录被消费者永远锁定。在某些时候,处理时间将超过 max.poll.interval.ms
,服务器会将分区重新分配给其他消费者,从而创建一个副本。
假设 max.poll.interval.ms
等于 5 分钟(默认)并且失败持续 30 分钟,这将导致消息被处理大约 10 分钟。 6次。
另一种可能性是 return 在 N 次重试(例如 3 次尝试)后,通过使用 SimpleRetryPolicy
将消息发送到队列。然后,消息将被重播(感谢 SeekToCurrentErrorHandler
)并且处理将从头开始,最多尝试 5 次。这会导致延迟形成一个系列,例如
10 secs -> 30 secs -> 90 secs -> 10 secs -> 30 secs -> 90 secs -> ...
这比不断上升的更不理想:)
是否有任何第三种情况可以使延迟形成一个递增的序列,同时不会在上述示例中产生重复项?
可以通过有状态重试来完成 - 在这种情况下,每次重试后都会抛出异常,但状态会保持在重试状态 object,因此该消息的下一次传递将使用下一次延迟等等
这需要消息中的某些内容(例如 header)来唯一标识每条消息。幸运的是,对于 Kafka,主题、分区和偏移量为状态提供了唯一键。
但是,目前 RetryingMessageListenerAdapter
不支持状态重试。
您可以在侦听器容器工厂中禁用重试并在侦听器中使用有状态 RetryTemplate
,使用采用 RetryState
参数的 execute
方法之一。
欢迎为框架添加一个 GitHub 问题以支持状态重试;欢迎投稿! - pull request issued.
编辑
我刚刚编写了一个测试用例来演示如何使用状态恢复 @KafkaListener
...
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.kafka.annotation;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.retry.RetryState;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.support.DefaultRetryState;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @author Gary Russell
* @since 5.0
*
*/
@RunWith(SpringRunner.class)
@DirtiesContext
public class StatefulRetryTests {
private static final String DEFAULT_TEST_GROUP_ID = "statefulRetry";
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 1, "sr1");
@Autowired
private Config config;
@Autowired
private KafkaTemplate<Integer, String> template;
@Test
public void testStatefulRetry() throws Exception {
this.template.send("sr1", "foo");
assertThat(this.config.listener1().latch1.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(this.config.listener1().latch2.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(this.config.listener1().result).isTrue();
}
@Configuration
@EnableKafka
public static class Config {
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler());
return factory;
}
@Bean
public DefaultKafkaConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> consumerProps =
KafkaTestUtils.consumerProps(DEFAULT_TEST_GROUP_ID, "false", embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return consumerProps;
}
@Bean
public KafkaTemplate<Integer, String> template() {
KafkaTemplate<Integer, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
return kafkaTemplate;
}
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
return KafkaTestUtils.producerProps(embeddedKafka);
}
@Bean
public Listener listener1() {
return new Listener();
}
}
public static class Listener {
private static final RetryTemplate retryTemplate = new RetryTemplate();
private static final ConcurrentMap<String, RetryState> states = new ConcurrentHashMap<>();
static {
ExponentialBackOffPolicy backOff = new ExponentialBackOffPolicy();
retryTemplate.setBackOffPolicy(backOff);
}
private final CountDownLatch latch1 = new CountDownLatch(3);
private final CountDownLatch latch2 = new CountDownLatch(1);
private volatile boolean result;
@KafkaListener(topics = "sr1", groupId = "sr1")
public void listen1(final String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
String recordKey = topic + partition + offset;
RetryState retryState = states.get(recordKey);
if (retryState == null) {
retryState = new DefaultRetryState(recordKey);
states.put(recordKey, retryState);
}
this.result = retryTemplate.execute(c -> {
// do your work here
this.latch1.countDown();
throw new RuntimeException("retry");
}, c -> {
latch2.countDown();
return true;
}, retryState);
states.remove(recordKey);
}
}
}
和
Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void org.springframework.kafka.annotation.StatefulRetryTests$Listener.listen1(java.lang.String,java.lang.String,int,long)' threw exception; nested exception is java.lang.RuntimeException: retry
每次投递尝试后。
在这种情况下,我添加了一个恢复器来处理重试耗尽后的消息。你可以做其他事情,比如停止容器(但是在一个单独的线程上做,就像我们在 ContainerStoppingErrorHandler
中做的那样)。