Java Spring Kafka 模板生产者在代理重启时丢失消息
Java Spring Kafka Template producer lost messages on broker restart
我正在使用 spring-boot (2.1.6.RELEASE) 和 spring-kafka (2.2.7.RELEASE),我正在向我的使用 KafkaTemplate 的 kafka 集群。但有时(通常是当我重新启动 kafka-broker 或进行重新平衡时)我在发送消息时看到这样的错误:
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
由于默认的 Kafka 生产者配置,我希望重试发送失败,但事实并非如此。默认 Kafka 生产者配置:
retries: 2147483647 (https://kafka.apache.org/documentation/#retries)
acks: 1 (https://kafka.apache.org/documentation/#acks)
我的配置是这样的:
@Bean
public Map<String, Object> producerConfigs()
{
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
return props;
}
@Bean
public ProducerFactory<Long, String> producerFactory()
{
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<Long, String> kafkaTemplate(KafkaTemplateProducerListener<Long, String> kafkaTemplateProducerListener,
ProducerFactory<Long, String> producerFactory)
{
KafkaTemplate<Long, String> kafkaTemplate = new KafkaTemplate<>(producerFactory);
kafkaTemplate.setProducerListener(kafkaTemplateProducerListener);
return kafkaTemplate;
}
我正在发送这样的消息:
kafkaTemplate.send(topicName, key, body);
我在整个互联网上搜索过,每个人都说这种带有重试和确认的配置必须有效,但实际上没有。我缺少什么?
谢谢
在花了一些时间调试后我找到了解决方案:
props.put(ProducerConfig.ACKS_CONFIG, "all");
有关此 属性 的更多信息:https://kafka.apache.org/documentation/#acks
非常好的博客,展示了您可能会在 kafka 中丢失消息的不同场景:
- 第 1 部分:https://jack-vanlightly.com/blog/2018/9/14/how-to-lose-messages-on-a-kafka-cluster-part1
- 第 2 部分:https://jack-vanlightly.com/blog/2018/9/18/how-to-lose-messages-on-a-kafka-cluster-part-2
旁注 - 来自的回答我发现如果你不想在关机时丢失消息,使用这个是个好主意:
@PreDestroy
public void flush()
{
kafkaTemplate.flush();
}
我正在使用 spring-boot (2.1.6.RELEASE) 和 spring-kafka (2.2.7.RELEASE),我正在向我的使用 KafkaTemplate 的 kafka 集群。但有时(通常是当我重新启动 kafka-broker 或进行重新平衡时)我在发送消息时看到这样的错误:
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
由于默认的 Kafka 生产者配置,我希望重试发送失败,但事实并非如此。默认 Kafka 生产者配置:
retries: 2147483647 (https://kafka.apache.org/documentation/#retries)
acks: 1 (https://kafka.apache.org/documentation/#acks)
我的配置是这样的:
@Bean
public Map<String, Object> producerConfigs()
{
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
return props;
}
@Bean
public ProducerFactory<Long, String> producerFactory()
{
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<Long, String> kafkaTemplate(KafkaTemplateProducerListener<Long, String> kafkaTemplateProducerListener,
ProducerFactory<Long, String> producerFactory)
{
KafkaTemplate<Long, String> kafkaTemplate = new KafkaTemplate<>(producerFactory);
kafkaTemplate.setProducerListener(kafkaTemplateProducerListener);
return kafkaTemplate;
}
我正在发送这样的消息:
kafkaTemplate.send(topicName, key, body);
我在整个互联网上搜索过,每个人都说这种带有重试和确认的配置必须有效,但实际上没有。我缺少什么?
谢谢
在花了一些时间调试后我找到了解决方案:
props.put(ProducerConfig.ACKS_CONFIG, "all");
有关此 属性 的更多信息:https://kafka.apache.org/documentation/#acks
非常好的博客,展示了您可能会在 kafka 中丢失消息的不同场景:
- 第 1 部分:https://jack-vanlightly.com/blog/2018/9/14/how-to-lose-messages-on-a-kafka-cluster-part1
- 第 2 部分:https://jack-vanlightly.com/blog/2018/9/18/how-to-lose-messages-on-a-kafka-cluster-part-2
旁注 - 来自
@PreDestroy
public void flush()
{
kafkaTemplate.flush();
}