spring Kafka 批处理侦听器是否以批处理模式提交数据库事务,如果失败,是否回滚完整的事务?

Does spring Kafka batch listener commits db transaction in batch mode and in case of failure is the complete transaction rolled back?

我有一个简单的要求,即读取 kafka 消息并将其存储在数据库中。我在批处理侦听器模式下使用 spring kafka。我已经阅读了 spring kafka 文档,但仍然不清楚在批处理侦听器模式下使用 spring kafka 时,它是否以批处理模式提交数据库事务,如果失败,是否会回滚完整的事务?

万一失败会再找同一套记录吗?

我有以下配置,

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigProperties.getBootstrapservers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfigProperties.getConsumer().getGroupid());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaConfigProperties.getConsumer().getOffset());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,250);
props.put(ApplicationConstant.KAFKA_SCHEMA_URL_PROPERTY, kafkaConfigProperties.getSchemaregistry());
@Bean
    public ConcurrentKafkaListenerContainerFactory<String, GenericRecord> kafkaListenerContainerFactory(KafkaConfigProperties kafkaConfigProperties) {
        ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        
        factory.setConsumerFactory(consumerFactory(kafkaConfigProperties));
        factory.setConcurrency(2);
        factory.setBatchListener(true);
        ContainerProperties containerProperties = factory.getContainerProperties();
        containerProperties.setAckOnError(false);
        containerProperties.setAckMode(AckMode.BATCH);
        return factory;
    }

您需要添加 SeekToCurrentBatchErrorHandlerRecoveringBatchErrorHander 才能重播该批次。这是 2.5 及更高版本的默认错误处理程序。

参见the documentation