Spring-Kafka Producer 在所有 broker 都宕机时重试
Spring-Kafka Producer Retry when all brokers are down
我正在使用 Springboot 2.3。5.RELEASE 以及 spring-kafka 2.6.3。我正在尝试做一个简单的 Kafka Producer Retry POC,这应该会导致生产者在代理关闭时重试,或者如果在将消息发送到代理之前抛出异常。
以下生产者配置适用于启用重试的幂等生产者。
// Producer configuration
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "3");
props.put(ProducerConfig.RETRIES_CONFIG, "10");
props.put(ProducerConfig.ACKS_CONFIG, "all");
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
... other configs
我知道 Kafka 生产者重试仅在代理向生产者发送确认时出现暂时性错误时才有效。因此,如果代理甚至在消息发送之前就已关闭,则上述重试将不起作用。
因此我尝试用 spring-kafka 引入@Retry,不幸的是我也无法让它工作。有谁知道如何解决这个问题,因为这似乎是一个常见的用例,比如如果代理出现故障或者出现网络故障我不希望我的制作人停止工作
@Transaction
@Retryable(value = Exception.class, maxAttemptsExpression = "10",
backoff = @Backoff(delayExpression = "1000"))
public void sendTestMessage(String name, String number) throws RuntimeException {
TestObject testObj = new TestObject(name + ": " + String.valueOf(number),
Double.valueOf(Math.random() * 10000).longValue(),
"Blah" + String.valueOf(Math.random() * 10));
log.info("Sending sendTestMessage {}", name);
throwSomeException();
// sends the message
kafkaTemplate.send(MyConstants.SomeTOPIC, testObj);
}
// Throwing mock exception
public void throwSomeException() throws RuntimeException {
log.info("Throwing mock exception {}");
throw new RuntimeException("Mock Exception thrown while sending the
sendTestMessage method");
}
@Recover
public void recover(Exception e, TestObject testObj) {
log.info("Recovered exception thrown mock exception {} with payload as {}", e, testObj);
}
有没有重试可以解决上述问题。我发现很难相信默认情况下没有内置这样的功能。
我解决了这个问题,我发布这个是为了让其他人受益。
我的配置中缺少以下内容 class
@Configuration
@EnableRetry <-- this was missing
public class ProducerConfig {
...
添加以上内容后,一切正常。
我正在使用 Springboot 2.3。5.RELEASE 以及 spring-kafka 2.6.3。我正在尝试做一个简单的 Kafka Producer Retry POC,这应该会导致生产者在代理关闭时重试,或者如果在将消息发送到代理之前抛出异常。
以下生产者配置适用于启用重试的幂等生产者。
// Producer configuration
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "3");
props.put(ProducerConfig.RETRIES_CONFIG, "10");
props.put(ProducerConfig.ACKS_CONFIG, "all");
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
... other configs
我知道 Kafka 生产者重试仅在代理向生产者发送确认时出现暂时性错误时才有效。因此,如果代理甚至在消息发送之前就已关闭,则上述重试将不起作用。 因此我尝试用 spring-kafka 引入@Retry,不幸的是我也无法让它工作。有谁知道如何解决这个问题,因为这似乎是一个常见的用例,比如如果代理出现故障或者出现网络故障我不希望我的制作人停止工作
@Transaction
@Retryable(value = Exception.class, maxAttemptsExpression = "10",
backoff = @Backoff(delayExpression = "1000"))
public void sendTestMessage(String name, String number) throws RuntimeException {
TestObject testObj = new TestObject(name + ": " + String.valueOf(number),
Double.valueOf(Math.random() * 10000).longValue(),
"Blah" + String.valueOf(Math.random() * 10));
log.info("Sending sendTestMessage {}", name);
throwSomeException();
// sends the message
kafkaTemplate.send(MyConstants.SomeTOPIC, testObj);
}
// Throwing mock exception
public void throwSomeException() throws RuntimeException {
log.info("Throwing mock exception {}");
throw new RuntimeException("Mock Exception thrown while sending the
sendTestMessage method");
}
@Recover
public void recover(Exception e, TestObject testObj) {
log.info("Recovered exception thrown mock exception {} with payload as {}", e, testObj);
}
有没有重试可以解决上述问题。我发现很难相信默认情况下没有内置这样的功能。
我解决了这个问题,我发布这个是为了让其他人受益。 我的配置中缺少以下内容 class
@Configuration
@EnableRetry <-- this was missing
public class ProducerConfig {
...
添加以上内容后,一切正常。