Spring Kafka ChainedKafkaTransactionManager 不与 JPA 同步 Spring-数据事务

Spring Kafka ChainedKafkaTransactionManager doesn't synchronize with JPA Spring-data transaction

我阅读了大量 Gary Russell 的回答和帖子,但没有找到针对以下序列同步的常见用例的实际解决方案:

recieve from topic A => save to DB via Spring-data => send to topic B

据我了解:在那种情况下不能保证完全原子处理,我需要在客户端处理消息重复数据删除,但主要问题是 ChainedKafkaTransactionManager 不同步使用 JpaTransactionManager(参见下面的 @KafkaListener

卡夫卡配置:

@Production
@EnableKafka
@Configuration
@EnableTransactionManagement
public class KafkaConfig {

    private static final Logger log = LoggerFactory.getLogger(KafkaConfig.class);

    @Bean
    public ConsumerFactory<String, byte[]> commonConsumerFactory(@Value("${kafka.broker}") String bootstrapServer) {

        Map<String, Object> props = new HashMap<>();
        props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);

        props.put(AUTO_OFFSET_RESET_CONFIG, 'earliest');
        props.put(SESSION_TIMEOUT_MS_CONFIG, 10000);
        props.put(ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(MAX_POLL_RECORDS_CONFIG, 10);
        props.put(MAX_POLL_INTERVAL_MS_CONFIG, 17000);
        props.put(FETCH_MIN_BYTES_CONFIG, 1048576);
        props.put(FETCH_MAX_WAIT_MS_CONFIG, 1000);
        props.put(ISOLATION_LEVEL_CONFIG, 'read_committed');

        props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, byte[]> kafkaListenerContainerFactory(
            @Qualifier("commonConsumerFactory") ConsumerFactory<String, byte[]> consumerFactory,
            @Qualifier("chainedKafkaTM") ChainedKafkaTransactionManager chainedKafkaTM,
            @Qualifier("kafkaTemplate") KafkaTemplate<String, byte[]> kafkaTemplate,
            @Value("${kafka.concurrency:#{T(java.lang.Runtime).getRuntime().availableProcessors()}}") Integer concurrency
    ) {

        ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.getContainerProperties().setMissingTopicsFatal(false);
        factory.getContainerProperties().setTransactionManager(chainedKafkaTM);

        factory.setConsumerFactory(consumerFactory);
        factory.setBatchListener(true);
        var arbp = new DefaultAfterRollbackProcessor<String, byte[]>(new FixedBackOff(1000L, 3));
        arbp.setCommitRecovered(true);
        arbp.setKafkaTemplate(kafkaTemplate);

        factory.setAfterRollbackProcessor(arbp);
        factory.setConcurrency(concurrency);

        factory.afterPropertiesSet();

        return factory;
    }

    @Bean
    public ProducerFactory<String, byte[]> producerFactory(@Value("${kafka.broker}") String bootstrapServer) {

        Map<String, Object> configProps = new HashMap<>();

        configProps.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);

        configProps.put(BATCH_SIZE_CONFIG, 16384);
        configProps.put(ENABLE_IDEMPOTENCE_CONFIG, true);

        configProps.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);

        var kafkaProducerFactory = new DefaultKafkaProducerFactory<String, byte[]>(configProps);
        kafkaProducerFactory.setTransactionIdPrefix('kafka-tx-');

        return kafkaProducerFactory;
    }

    @Bean
    public KafkaTemplate<String, byte[]> kafkaTemplate(@Qualifier("producerFactory") ProducerFactory<String, byte[]> producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }

    @Bean
    public KafkaTransactionManager kafkaTransactionManager(@Qualifier("producerFactory") ProducerFactory<String, byte[]> producerFactory) {
        KafkaTransactionManager ktm = new KafkaTransactionManager<>(producerFactory);
        ktm.setTransactionSynchronization(SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
        return ktm;
    }

    @Bean
    public ChainedKafkaTransactionManager chainedKafkaTM(JpaTransactionManager jpaTransactionManager,
                                                         KafkaTransactionManager kafkaTransactionManager) {
        return new ChainedKafkaTransactionManager(kafkaTransactionManager, jpaTransactionManager);
    }

    @Bean(name = "transactionManager")
    public JpaTransactionManager transactionManager(EntityManagerFactory em) {
        return new JpaTransactionManager(em);
    }
}

Kafka 监听器:

@KafkaListener(groupId = "${group.id}", idIsGroup = false, topics = "${topic.name.import}")
public void consume(List<byte[]> records, @Header(KafkaHeaders.OFFSET) Long offset) {
    for (byte[] record : records) {
        // cause infinity rollback (perhaps due to batch listener)
        if (true)
            throw new RuntimeExcetion("foo");

        // spring-data storage with @Transactional("chainedKafkaTM"), since Spring-data can't determine TM among transactionManager, chainedKafkaTM, kafkaTransactionManager
        var result = storageService.persist(record);

        kafkaTemplate.send(result);
    }
}

Spring-kafka版本:2.3.3 Spring-引导版本:2.2.1

实现此类用例的正确方法是什么? Spring-kafka 文档仅限于 small/specific 个示例。

P.s. 当我在 @KafkaListener 方法上使用 @Transactional(transactionManager = "chainedKafkaTM", rollbackFor = Exception.class) 时,我面临着无休止的循环回滚,但是 FixedBackOff(1000L, 3L) 已设置。

编辑:我计划通过可配置的重试次数在监听器、生产者和数据库之间实现最大的负担得起的同步。

编辑: 以上代码片段根据建议的配置进行了编辑。使用 ARBP 并不能为我解决无限回滚循环,因为第一个语句的谓词始终为假 (SeekUtils.doSeeks):

DefaultAfterRollbackProcessor
...
@Override
    public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer, Exception exception,
            boolean recoverable) {

        if (SeekUtils.doSeeks(((List) records), consumer, exception, recoverable,
                getSkipPredicate((List) records, exception), LOGGER)
                    && isCommitRecovered() && this.kafkaTemplate != null && this.kafkaTemplate.isTransactional()) {
            ConsumerRecord<K, V> skipped = records.get(0);
            this.kafkaTemplate.sendOffsetsToTransaction(
                    Collections.singletonMap(new TopicPartition(skipped.topic(), skipped.partition()),
                            new OffsetAndMetadata(skipped.offset() + 1)));
        }
    }

值得一提的是,Kafka Consumer 方法中没有活动事务(TransactionSynchronizationManager.isActualTransactionActive())。

是什么让您认为它不同步?您真的不需要 @Transactional,因为容器将启动两个事务。

您不应将 SeekToCurrentErrorHandler 用于交易,因为它发生在交易中。改为配置后回滚处理器。默认 ARBP 使用 FixedBackOff(0L, 9)(10 次尝试)。

这对我来说很好;并在 4 次投递尝试后停止:

@SpringBootApplication
public class So58804826Application {

    public static void main(String[] args) {
        SpringApplication.run(So58804826Application.class, args);
    }

    @Bean
    public JpaTransactionManager transactionManager() {
        return new JpaTransactionManager();
    }


    @Bean
    public ChainedKafkaTransactionManager<?, ?> chainedTxM(JpaTransactionManager jpa,
            KafkaTransactionManager<?, ?> kafka) {

        kafka.setTransactionSynchronization(SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
        return new ChainedKafkaTransactionManager<>(kafka, jpa);
    }

    @Autowired
    private Saver saver;

    @KafkaListener(id = "so58804826", topics = "so58804826")
    public void listen(String in) {
        System.out.println("Storing: " + in);
        this.saver.save(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so58804826")
                .partitions(1)
                .replicas(1)
                .build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
//          template.executeInTransaction(t -> t.send("so58804826", "foo"));
        };
    }

}

@Component
class ContainerFactoryConfigurer {

    ContainerFactoryConfigurer(ConcurrentKafkaListenerContainerFactory<?, ?> factory,
            ChainedKafkaTransactionManager<?, ?> tm) {

        factory.getContainerProperties().setTransactionManager(tm);
        factory.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<>(new FixedBackOff(1000L, 3)));
    }

}

@Component
class Saver {

    @Autowired
    private MyEntityRepo repo;

    private final AtomicInteger ids = new AtomicInteger();

    @Transactional("chainedTxM")
    public void save(String in) {
        this.repo.save(new MyEntity(in, this.ids.incrementAndGet()));
        throw new RuntimeException("foo");
    }

}

我从两个 TxM 看到“参与现有交易”。

@Transactional("transactionManager"),我只是从 JPATm 得到它,正如人们所期望的那样。

编辑

批处理侦听器没有“恢复”的概念 - 框架不知道需要跳过批处理中的哪条记录。在 2.3 中,我们在使用 MANUAL ack 模式时为批处理侦听器添加了一个新功能。

Committing Offsets

Starting with version 2.3, the Acknowledgment interface has two additional methods nack(long sleep) and nack(int index, long sleep). The first one is used with a record listener, the second with a batch listener. Calling the wrong method for your listener type will throw an IllegalStateException.

When using a batch listener, you can specify the index within the batch where the failure occurred. When nack() is called, offsets will be committed for records before the index and seeks are performed on the partitions for the failed and discarded records so that they will be redelivered on the next poll(). This is an improvement over the SeekToCurrentBatchErrorHandler, which can only seek the entire batch for redelivery.

但是,失败的记录仍将无限期重播。

您可以跟踪不断失败的记录,然后 index + 1 跳过它。

但是,由于您的 JPA 交易已回滚;这对你不起作用。

对于批处理侦听器,您必须在侦听器代码中处理批处理问题。