为什么我在使用 Kafka 和 EOS Beta 时丢失消息

Why I lost messages when using kafka with EOS Beta

我正在使用 spring-kafka 2.5。5.RELEASE 我在使用带有 ExactlyOnceSemantic 的 kafka 时丢失了消息。

配置

我的监听器正在使用 @KafkaListener 和这个容器配置:

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaBatchListenerContainerFactory(
                ConsumerFactory<Object, Object> kafkaConsumerFactory,
                AfterRollbackProcessor<Object, Object> afterRollbackProcessor,
                KafkaTransactionManager<Object, Object> kafkaTransactionManager,
                ConsumerRecordRecoverer myRecoverer
        ) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(kafkaConsumerFactory);
    factory.setAfterRollbackProcessor(afterRollbackProcessor);
    factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);
    factory.getContainerProperties().setEosMode(ContainerProperties.EOSMode.BETA);
    factory.setBatchListener(true);
    factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>(myRecoverer));
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
    return factory;
}
@Bean
public AfterRollbackProcessor<?, ?> afterRollbackProcessor(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
    return new DefaultAfterRollbackProcessor<>(deadLetterPublishingRecoverer);
}

属性:

spring.kafka.producer.acks = all
spring.kafka.consumer.isolation-level = read_committed
spring.kafka.consumer.group-id = ap_u2
# Use hostname to generate unique transaction id per instance
spring.kafka.producer.transaction-id-prefix = tx_ap_u2_${HOSTNAME}_

主题有 3 个分区,我有 3 个应用程序实例,每个实例占用 1 个分区。

问题

我在读-过程-写的情况下丢失了一些消息。 输出主题没有收到处理过的消息。

DLT 为空,监听器继续处理消息。

丢失的消息与错误日志相关联:

Nov 2, 2020 @ 16:04:23.696
org.springframework.kafka.core.DefaultKafkaProducerFactory
commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@2a8419e]
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Nov 2, 2020 @ 16:04:23.788
org.springframework.kafka.core.DefaultKafkaProducerFactory
Error during some operation; producer removed from cache: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@2a8419e]

Nov 2, 2020 @ 16:04:23.979
org.springframework.kafka.support.LoggingProducerListener
Exception thrown when sending a message with key='XXX' and payload='XXX' to topic XXX:
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Nov 2, 2020 @ 16:04:23.981
org.apache.kafka.clients.producer.internals.Sender
[Producer clientId=producer-XXXXXX.0, transactionalId=XXXXXX.0] Aborting producer batches due to fatal error
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

26 times on different messages:
Nov 2, 2020 @ 16:04:23.985 to Nov 2, 2020 @ 16:04:24.787
org.springframework.kafka.support.LoggingProducerListener
Exception thrown when sending a message with key='XXX' and payload='XXX' to topic XXX:
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

3 times this log :
Nov 2, 2020 @ 16:04:24.893
org.apache.kafka.clients.NetworkClient
[Producer clientId=producer-XXXXXX.0, transactionalId=XXXXXX.0] Uncaught error in request completion:
java.lang.IllegalStateException: Should not reopen a batch which is already aborted.
    at org.apache.kafka.common.record.MemoryRecordsBuilder.reopenAndRewriteProducerState(MemoryRecordsBuilder.java:295)

6 times:
Nov 2, 2020 @ 16:04:24.890 to Nov 2, 2020 @ 16:04:24.887
org.springframework.kafka.support.LoggingProducerListener
Exception thrown when sending a message with key='XXX' and payload='XXX' to topic XXX:
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Nov 2, 2020 @ 16:04:25.085
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerCons
Producer or 'group.instance.id' fenced during transaction
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

问题

为什么使用具有 EOS.Beta 语义的读-进程-写事务侦听器丢失了 244 条消息? 除了交易已被隔离外,没有任何进一步的信息。

谢谢

Producer or 'group.instance.id' fenced during transaction

对于 ProducerFencedException,回滚处理器不会被调用,因为这意味着消费者已经丢失了那些分区。您应该会在该错误后看到重新平衡 activity。由于偏移量未发送到事务,因此应将相同的记录重新传送到此实例或另一个实例。

如果您在该错误后没有看到重新平衡,那么确实发生了一些奇怪的事情。

编辑

我刚刚编写了一个小型测试应用程序,它完全按照预期运行 - 当 2 个副本时 运行;我得到了和你一样的错误,并且在同一条记录上一直失败。

public class So64665725Application {

    public static void main(String[] args) {
        SpringApplication.run(So64665725Application.class, args);
    }

    @Autowired
    KafkaTemplate<String, String> template;

    @KafkaListener(id = "so64665725", topics = "so64665725-1")
    public void listen(String in) throws Exception {
        System.out.println(in);
        Thread.sleep(15000);
        try {
            System.out.println(this.template.send("so64665725-2", in.toUpperCase()).get(10, TimeUnit.SECONDS)
                    .getRecordMetadata());
        }
        catch (ExecutionException e) {
            e.getCause().printStackTrace();
            throw (Exception) e.getCause();
        }
        catch (TimeoutException e) {
            throw e;
        }
    }

    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name("so64665725-1").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name("so64665725-2").partitions(1).replicas(1).build();
    }

}
spring.kafka.consumer.properties.max.poll.interval.ms=10000
spring.kafka.consumer.auto-offset-reset=earliest

spring.kafka.producer.acks=all
spring.kafka.producer.transaction-id-prefix=tx-

这是使用 Boot 2.4.0-RC1 和 SK 2.6.2,默认有 EOSMode.BETA。

kafka 中的已知错误:KAFKA-9803

解决方案在 spring-kafka 2.5.8.RELEASE 和 2.6.3.RELEASE

中实现

查看 stopContainerWhenFenced 选项