在 Kafka 侦听器线程中生成记录时获取 ProducerFencedException

Getting ProducerFencedException on producing record in Kafka listener thread

我在 kafka 侦听器容器中生成消息时遇到此异常。

javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=producer-tx-group.topicA.1
org.apache.kafka.common.errors.ProducerFencedException: The producer has been rejected from the broker because it tried to use an old epoch with the transactionalId

我的听众长这样

@Transactional
@kafkaListener(...)
listener(topicA, message){
 process(message)
 produce(topicB, notification) // use Kafkatemplate to send the message
}

我的配置是这样的

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(KafkaTransactionManager kafkaTransactionManager) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
              new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);
        return factory;
    }

    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence);
        DefaultKafkaProducerFactory<String, Object> factory = new 
        DefaultKafkaProducerFactory<>(props);
        factory.setTransactionIdPrefix(transactionIdPrefix);
        return factory;
    }


    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        KafkaTemplate<String, Object> template = new KafkaTemplate<>(producerFactory());
        return template;
    }

    @Bean
    public KafkaTransactionManager kafkaTransactionManager() {
        KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory());
        return manager;
    }

我知道 Kafka 何时抛出 ProducerFencedException,但是我想在这里弄清楚第二个具有相同 transaction.id.

的生产者在哪里

如果我在 Kafka 模板中设置唯一事务前缀,它就可以正常工作

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        KafkaTemplate<String, Object> template = new KafkaTemplate<>(producerFactory());
        template.setTransactionIdPrefix(MessageFormat.format("{0}-{1}", transactionIdPrefix, UUID.randomUUID().toString()));
        return template;
    }

但我试图理解这里的异常,从那里开始另一个生产者使用相同的交易 ID,按照 spring 文档 group.id/topic/partition[,监听器开始的交易遵循这种模式=15=]

我只是在本地对单个应用程序实例进行尝试。

我找到了根本原因,我在这里创建了两个生产者实例

    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence);
        DefaultKafkaProducerFactory<String, Object> factory = new 
        DefaultKafkaProducerFactory<>(props);
        factory.setTransactionIdPrefix(transactionIdPrefix);
        return factory;
    }

我缺少 Bean 配置。 在生产因子上添加@Bean 并在模板和 TM 中正确自动连接它解决了这个问题。