在 Kafka 侦听器线程中生成记录时获取 ProducerFencedException
Getting ProducerFencedException on producing record in Kafka listener thread
我在 kafka 侦听器容器中生成消息时遇到此异常。
javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=producer-tx-group.topicA.1
org.apache.kafka.common.errors.ProducerFencedException: The producer has been rejected from the broker because it tried to use an old epoch with the transactionalId
我的听众长这样
@Transactional
@kafkaListener(...)
listener(topicA, message){
process(message)
produce(topicB, notification) // use Kafkatemplate to send the message
}
我的配置是这样的
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(KafkaTransactionManager kafkaTransactionManager) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);
return factory;
}
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence);
DefaultKafkaProducerFactory<String, Object> factory = new
DefaultKafkaProducerFactory<>(props);
factory.setTransactionIdPrefix(transactionIdPrefix);
return factory;
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
KafkaTemplate<String, Object> template = new KafkaTemplate<>(producerFactory());
return template;
}
@Bean
public KafkaTransactionManager kafkaTransactionManager() {
KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory());
return manager;
}
我知道 Kafka 何时抛出 ProducerFencedException,但是我想在这里弄清楚第二个具有相同 transaction.id.
的生产者在哪里
如果我在 Kafka 模板中设置唯一事务前缀,它就可以正常工作
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
KafkaTemplate<String, Object> template = new KafkaTemplate<>(producerFactory());
template.setTransactionIdPrefix(MessageFormat.format("{0}-{1}", transactionIdPrefix, UUID.randomUUID().toString()));
return template;
}
但我试图理解这里的异常,从那里开始另一个生产者使用相同的交易 ID,按照 spring 文档 group.id/topic/partition
[,监听器开始的交易遵循这种模式=15=]
我只是在本地对单个应用程序实例进行尝试。
我找到了根本原因,我在这里创建了两个生产者实例
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence);
DefaultKafkaProducerFactory<String, Object> factory = new
DefaultKafkaProducerFactory<>(props);
factory.setTransactionIdPrefix(transactionIdPrefix);
return factory;
}
我缺少 Bean 配置。
在生产因子上添加@Bean 并在模板和 TM 中正确自动连接它解决了这个问题。
我在 kafka 侦听器容器中生成消息时遇到此异常。
javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=producer-tx-group.topicA.1
org.apache.kafka.common.errors.ProducerFencedException: The producer has been rejected from the broker because it tried to use an old epoch with the transactionalId
我的听众长这样
@Transactional
@kafkaListener(...)
listener(topicA, message){
process(message)
produce(topicB, notification) // use Kafkatemplate to send the message
}
我的配置是这样的
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(KafkaTransactionManager kafkaTransactionManager) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);
return factory;
}
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence);
DefaultKafkaProducerFactory<String, Object> factory = new
DefaultKafkaProducerFactory<>(props);
factory.setTransactionIdPrefix(transactionIdPrefix);
return factory;
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
KafkaTemplate<String, Object> template = new KafkaTemplate<>(producerFactory());
return template;
}
@Bean
public KafkaTransactionManager kafkaTransactionManager() {
KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory());
return manager;
}
我知道 Kafka 何时抛出 ProducerFencedException,但是我想在这里弄清楚第二个具有相同 transaction.id.
的生产者在哪里如果我在 Kafka 模板中设置唯一事务前缀,它就可以正常工作
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
KafkaTemplate<String, Object> template = new KafkaTemplate<>(producerFactory());
template.setTransactionIdPrefix(MessageFormat.format("{0}-{1}", transactionIdPrefix, UUID.randomUUID().toString()));
return template;
}
但我试图理解这里的异常,从那里开始另一个生产者使用相同的交易 ID,按照 spring 文档 group.id/topic/partition
[,监听器开始的交易遵循这种模式=15=]
我只是在本地对单个应用程序实例进行尝试。
我找到了根本原因,我在这里创建了两个生产者实例
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence);
DefaultKafkaProducerFactory<String, Object> factory = new
DefaultKafkaProducerFactory<>(props);
factory.setTransactionIdPrefix(transactionIdPrefix);
return factory;
}
我缺少 Bean 配置。 在生产因子上添加@Bean 并在模板和 TM 中正确自动连接它解决了这个问题。