Kafka 事务生产者抛出 'Timeout expired while initializing transactional state in 60000ms'
Kafka transactional producer throws 'Timeout expired while initializing transactional state in 60000ms'
我已经配置了一个带有 transactionIdPrefix 的 Kafka ProducerFactory,以便使用 @Transactional
启用事务同步(参见 Spring documentation on producer-only transactions)。
我在集成测试中 运行 一个 EmbeddedKafka,看看它的行为如何。
日志显示如下:
DEBUG 8384 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender :
[Producer clientId=producer-1, transactionalId=tx-0-0]
Sending transactional request (type=FindCoordinatorRequest, coordinatorKey=tx-0-0, coordinatorType=TRANSACTION) to node 127.0.0.1:61445 (id: -1 rack: null)
DEBUG 8384 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager :
[Producer clientId=producer-1, transactionalId=tx-0-0]
Enqueuing transactional request (type=FindCoordinatorRequest, coordinatorKey=tx-0-0, coordinatorType=TRANSACTION)
Timeout expired while initializing transactional state in 60000ms.
DefaultKafkaProducerFactory执行时抛出newProducer.initTransactions()
.
我的配置如下:
综合测试
@EmbeddedKafka(brokerProperties = { "transaction.state.log.replication.min.isr=1", "transaction.state.log.replication.factor=1" })
ProducerConfig
@Bean
public ProducerFactory<String, String> transactionalProducerFactory() {
Map<String, Object> configuration = new HashMap<>();
configuration.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
configuration.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configuration.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
String transactionIdPrefix = "tx-0-";
configuration.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(configuration);
factory.setTransactionIdPrefix(transactionIdPrefix);
return factory;
}
@Bean
public KafkaTemplate<String, String> transactionalKafka() {
return new KafkaTemplate<>(transactionalProducerFactory());
}
Spring-卡夫卡版本:2.2.7.RELEASE
我不知道如何前进,我认为我按照文档中的每一步进行操作,Kafka 客户端和代理之间的通信在事务初始化期间应该没问题。
谁能帮我解决这个问题?
多亏了嵌入式 kafka 服务器日志,我可以解决问题。
属性 transaction.state.log.min.isr
默认为2,我不得不用transaction.state.log.min.isr = 1
覆盖它来修复服务器错误。之后我的集成测试通过了。
我已经配置了一个带有 transactionIdPrefix 的 Kafka ProducerFactory,以便使用 @Transactional
启用事务同步(参见 Spring documentation on producer-only transactions)。
我在集成测试中 运行 一个 EmbeddedKafka,看看它的行为如何。
日志显示如下:
DEBUG 8384 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender :
[Producer clientId=producer-1, transactionalId=tx-0-0]
Sending transactional request (type=FindCoordinatorRequest, coordinatorKey=tx-0-0, coordinatorType=TRANSACTION) to node 127.0.0.1:61445 (id: -1 rack: null)
DEBUG 8384 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager :
[Producer clientId=producer-1, transactionalId=tx-0-0]
Enqueuing transactional request (type=FindCoordinatorRequest, coordinatorKey=tx-0-0, coordinatorType=TRANSACTION)
Timeout expired while initializing transactional state in 60000ms.
DefaultKafkaProducerFactory执行时抛出newProducer.initTransactions()
.
我的配置如下:
综合测试
@EmbeddedKafka(brokerProperties = { "transaction.state.log.replication.min.isr=1", "transaction.state.log.replication.factor=1" })
ProducerConfig
@Bean
public ProducerFactory<String, String> transactionalProducerFactory() {
Map<String, Object> configuration = new HashMap<>();
configuration.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
configuration.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configuration.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
String transactionIdPrefix = "tx-0-";
configuration.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(configuration);
factory.setTransactionIdPrefix(transactionIdPrefix);
return factory;
}
@Bean
public KafkaTemplate<String, String> transactionalKafka() {
return new KafkaTemplate<>(transactionalProducerFactory());
}
Spring-卡夫卡版本:2.2.7.RELEASE
我不知道如何前进,我认为我按照文档中的每一步进行操作,Kafka 客户端和代理之间的通信在事务初始化期间应该没问题。 谁能帮我解决这个问题?
多亏了嵌入式 kafka 服务器日志,我可以解决问题。
属性 transaction.state.log.min.isr
默认为2,我不得不用transaction.state.log.min.isr = 1
覆盖它来修复服务器错误。之后我的集成测试通过了。