为什么我在使用 Kafka 和 EOS Beta 时丢失消息
Why I lost messages when using kafka with EOS Beta
我正在使用 spring-kafka 2.5。5.RELEASE 我在使用带有 ExactlyOnceSemantic 的 kafka 时丢失了消息。
配置
我的监听器正在使用 @KafkaListener
和这个容器配置:
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaBatchListenerContainerFactory(
ConsumerFactory<Object, Object> kafkaConsumerFactory,
AfterRollbackProcessor<Object, Object> afterRollbackProcessor,
KafkaTransactionManager<Object, Object> kafkaTransactionManager,
ConsumerRecordRecoverer myRecoverer
) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory);
factory.setAfterRollbackProcessor(afterRollbackProcessor);
factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);
factory.getContainerProperties().setEosMode(ContainerProperties.EOSMode.BETA);
factory.setBatchListener(true);
factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>(myRecoverer));
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
return factory;
}
@Bean
public AfterRollbackProcessor<?, ?> afterRollbackProcessor(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new DefaultAfterRollbackProcessor<>(deadLetterPublishingRecoverer);
}
属性:
spring.kafka.producer.acks = all
spring.kafka.consumer.isolation-level = read_committed
spring.kafka.consumer.group-id = ap_u2
# Use hostname to generate unique transaction id per instance
spring.kafka.producer.transaction-id-prefix = tx_ap_u2_${HOSTNAME}_
主题有 3 个分区,我有 3 个应用程序实例,每个实例占用 1 个分区。
问题
我在读-过程-写的情况下丢失了一些消息。
输出主题没有收到处理过的消息。
DLT 为空,监听器继续处理消息。
丢失的消息与错误日志相关联:
Nov 2, 2020 @ 16:04:23.696
org.springframework.kafka.core.DefaultKafkaProducerFactory
commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@2a8419e]
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
Nov 2, 2020 @ 16:04:23.788
org.springframework.kafka.core.DefaultKafkaProducerFactory
Error during some operation; producer removed from cache: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@2a8419e]
Nov 2, 2020 @ 16:04:23.979
org.springframework.kafka.support.LoggingProducerListener
Exception thrown when sending a message with key='XXX' and payload='XXX' to topic XXX:
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
Nov 2, 2020 @ 16:04:23.981
org.apache.kafka.clients.producer.internals.Sender
[Producer clientId=producer-XXXXXX.0, transactionalId=XXXXXX.0] Aborting producer batches due to fatal error
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
26 times on different messages:
Nov 2, 2020 @ 16:04:23.985 to Nov 2, 2020 @ 16:04:24.787
org.springframework.kafka.support.LoggingProducerListener
Exception thrown when sending a message with key='XXX' and payload='XXX' to topic XXX:
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
3 times this log :
Nov 2, 2020 @ 16:04:24.893
org.apache.kafka.clients.NetworkClient
[Producer clientId=producer-XXXXXX.0, transactionalId=XXXXXX.0] Uncaught error in request completion:
java.lang.IllegalStateException: Should not reopen a batch which is already aborted.
at org.apache.kafka.common.record.MemoryRecordsBuilder.reopenAndRewriteProducerState(MemoryRecordsBuilder.java:295)
6 times:
Nov 2, 2020 @ 16:04:24.890 to Nov 2, 2020 @ 16:04:24.887
org.springframework.kafka.support.LoggingProducerListener
Exception thrown when sending a message with key='XXX' and payload='XXX' to topic XXX:
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
Nov 2, 2020 @ 16:04:25.085
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerCons
Producer or 'group.instance.id' fenced during transaction
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
问题
为什么使用具有 EOS.Beta 语义的读-进程-写事务侦听器丢失了 244 条消息?
除了交易已被隔离外,没有任何进一步的信息。
谢谢
Producer or 'group.instance.id' fenced during transaction
对于 ProducerFencedException
,回滚处理器不会被调用,因为这意味着消费者已经丢失了那些分区。您应该会在该错误后看到重新平衡 activity。由于偏移量未发送到事务,因此应将相同的记录重新传送到此实例或另一个实例。
如果您在该错误后没有看到重新平衡,那么确实发生了一些奇怪的事情。
编辑
我刚刚编写了一个小型测试应用程序,它完全按照预期运行 - 当 2 个副本时 运行;我得到了和你一样的错误,并且在同一条记录上一直失败。
public class So64665725Application {
public static void main(String[] args) {
SpringApplication.run(So64665725Application.class, args);
}
@Autowired
KafkaTemplate<String, String> template;
@KafkaListener(id = "so64665725", topics = "so64665725-1")
public void listen(String in) throws Exception {
System.out.println(in);
Thread.sleep(15000);
try {
System.out.println(this.template.send("so64665725-2", in.toUpperCase()).get(10, TimeUnit.SECONDS)
.getRecordMetadata());
}
catch (ExecutionException e) {
e.getCause().printStackTrace();
throw (Exception) e.getCause();
}
catch (TimeoutException e) {
throw e;
}
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("so64665725-1").partitions(1).replicas(1).build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("so64665725-2").partitions(1).replicas(1).build();
}
}
spring.kafka.consumer.properties.max.poll.interval.ms=10000
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.acks=all
spring.kafka.producer.transaction-id-prefix=tx-
这是使用 Boot 2.4.0-RC1 和 SK 2.6.2,默认有 EOSMode.BETA。
kafka 中的已知错误:KAFKA-9803
解决方案在 spring-kafka 2.5.8.RELEASE 和 2.6.3.RELEASE
中实现
查看 stopContainerWhenFenced 选项
我正在使用 spring-kafka 2.5。5.RELEASE 我在使用带有 ExactlyOnceSemantic 的 kafka 时丢失了消息。
配置
我的监听器正在使用 @KafkaListener
和这个容器配置:
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaBatchListenerContainerFactory(
ConsumerFactory<Object, Object> kafkaConsumerFactory,
AfterRollbackProcessor<Object, Object> afterRollbackProcessor,
KafkaTransactionManager<Object, Object> kafkaTransactionManager,
ConsumerRecordRecoverer myRecoverer
) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory);
factory.setAfterRollbackProcessor(afterRollbackProcessor);
factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);
factory.getContainerProperties().setEosMode(ContainerProperties.EOSMode.BETA);
factory.setBatchListener(true);
factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>(myRecoverer));
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
return factory;
}
@Bean
public AfterRollbackProcessor<?, ?> afterRollbackProcessor(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new DefaultAfterRollbackProcessor<>(deadLetterPublishingRecoverer);
}
属性:
spring.kafka.producer.acks = all
spring.kafka.consumer.isolation-level = read_committed
spring.kafka.consumer.group-id = ap_u2
# Use hostname to generate unique transaction id per instance
spring.kafka.producer.transaction-id-prefix = tx_ap_u2_${HOSTNAME}_
主题有 3 个分区,我有 3 个应用程序实例,每个实例占用 1 个分区。
问题
我在读-过程-写的情况下丢失了一些消息。 输出主题没有收到处理过的消息。
DLT 为空,监听器继续处理消息。
丢失的消息与错误日志相关联:
Nov 2, 2020 @ 16:04:23.696
org.springframework.kafka.core.DefaultKafkaProducerFactory
commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@2a8419e]
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
Nov 2, 2020 @ 16:04:23.788
org.springframework.kafka.core.DefaultKafkaProducerFactory
Error during some operation; producer removed from cache: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@2a8419e]
Nov 2, 2020 @ 16:04:23.979
org.springframework.kafka.support.LoggingProducerListener
Exception thrown when sending a message with key='XXX' and payload='XXX' to topic XXX:
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
Nov 2, 2020 @ 16:04:23.981
org.apache.kafka.clients.producer.internals.Sender
[Producer clientId=producer-XXXXXX.0, transactionalId=XXXXXX.0] Aborting producer batches due to fatal error
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
26 times on different messages:
Nov 2, 2020 @ 16:04:23.985 to Nov 2, 2020 @ 16:04:24.787
org.springframework.kafka.support.LoggingProducerListener
Exception thrown when sending a message with key='XXX' and payload='XXX' to topic XXX:
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
3 times this log :
Nov 2, 2020 @ 16:04:24.893
org.apache.kafka.clients.NetworkClient
[Producer clientId=producer-XXXXXX.0, transactionalId=XXXXXX.0] Uncaught error in request completion:
java.lang.IllegalStateException: Should not reopen a batch which is already aborted.
at org.apache.kafka.common.record.MemoryRecordsBuilder.reopenAndRewriteProducerState(MemoryRecordsBuilder.java:295)
6 times:
Nov 2, 2020 @ 16:04:24.890 to Nov 2, 2020 @ 16:04:24.887
org.springframework.kafka.support.LoggingProducerListener
Exception thrown when sending a message with key='XXX' and payload='XXX' to topic XXX:
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
Nov 2, 2020 @ 16:04:25.085
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerCons
Producer or 'group.instance.id' fenced during transaction
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
问题
为什么使用具有 EOS.Beta 语义的读-进程-写事务侦听器丢失了 244 条消息? 除了交易已被隔离外,没有任何进一步的信息。
谢谢
Producer or 'group.instance.id' fenced during transaction
对于 ProducerFencedException
,回滚处理器不会被调用,因为这意味着消费者已经丢失了那些分区。您应该会在该错误后看到重新平衡 activity。由于偏移量未发送到事务,因此应将相同的记录重新传送到此实例或另一个实例。
如果您在该错误后没有看到重新平衡,那么确实发生了一些奇怪的事情。
编辑
我刚刚编写了一个小型测试应用程序,它完全按照预期运行 - 当 2 个副本时 运行;我得到了和你一样的错误,并且在同一条记录上一直失败。
public class So64665725Application {
public static void main(String[] args) {
SpringApplication.run(So64665725Application.class, args);
}
@Autowired
KafkaTemplate<String, String> template;
@KafkaListener(id = "so64665725", topics = "so64665725-1")
public void listen(String in) throws Exception {
System.out.println(in);
Thread.sleep(15000);
try {
System.out.println(this.template.send("so64665725-2", in.toUpperCase()).get(10, TimeUnit.SECONDS)
.getRecordMetadata());
}
catch (ExecutionException e) {
e.getCause().printStackTrace();
throw (Exception) e.getCause();
}
catch (TimeoutException e) {
throw e;
}
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("so64665725-1").partitions(1).replicas(1).build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("so64665725-2").partitions(1).replicas(1).build();
}
}
spring.kafka.consumer.properties.max.poll.interval.ms=10000
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.acks=all
spring.kafka.producer.transaction-id-prefix=tx-
这是使用 Boot 2.4.0-RC1 和 SK 2.6.2,默认有 EOSMode.BETA。
kafka 中的已知错误:KAFKA-9803
解决方案在 spring-kafka 2.5.8.RELEASE 和 2.6.3.RELEASE
中实现查看 stopContainerWhenFenced 选项