Producer#initTransactions 不适用于 KafkaContainer
Producer#initTransactions doesn't work with KafkaContainer
我尝试通过事务向 Kafka 发送消息。所以,我使用这个代码:
try (Producer<Void, String> producer = createProducer(kafkaContainerBootstrapServers)) {
producer.initTransactions();
producer.beginTransaction();
Arrays.stream(messages).forEach(
message -> producer.send(new ProducerRecord<>(KAFKA_INPUT_TOPIC, message)));
producer.commitTransaction();
}
...
private static Producer<Void, String> createProducer(String kafkaContainerBootstrapServers) {
return new KafkaProducer<>(
ImmutableMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainerBootstrapServers,
ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString(),
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true,
ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString()
),
new VoidSerializer(),
new StringSerializer());
}
如果我使用本地 Kafka,效果很好。
但是如果我使用 Kafka TestContainers,它会在 producer.initTransactions()
:
冻结
private static final String KAFKA_VERSION = "4.1.1";
@Rule
public KafkaContainer kafka = new KafkaContainer(KAFKA_VERSION)
.withEmbeddedZookeeper();
如何配置 KafkaContainer 以处理事务?
尝试使用 Kafka for JUnit 而不是 Kafka 测试容器。我在交易方面遇到了同样的问题,并以这种方式让他们活着。
我使用的Maven依赖:
<dependency>
<groupId>net.mguenther.kafka</groupId>
<artifactId>kafka-junit</artifactId>
<version>2.1.0</version>
<scope>test</scope>
</dependency>
我在使用 Kafka for JUnit as @AntonLitvinenko suggested. My question about it 时遇到异常。
我添加了这个依赖项来修复它(参见 issue):
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>2.12.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
此外,我使用 2.0.1
版本的 kafka-junit 和 kafka_2.11:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafkaVersion}</version>
<scope>test</scope>
</dependency>
我尝试通过事务向 Kafka 发送消息。所以,我使用这个代码:
try (Producer<Void, String> producer = createProducer(kafkaContainerBootstrapServers)) {
producer.initTransactions();
producer.beginTransaction();
Arrays.stream(messages).forEach(
message -> producer.send(new ProducerRecord<>(KAFKA_INPUT_TOPIC, message)));
producer.commitTransaction();
}
...
private static Producer<Void, String> createProducer(String kafkaContainerBootstrapServers) {
return new KafkaProducer<>(
ImmutableMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainerBootstrapServers,
ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString(),
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true,
ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString()
),
new VoidSerializer(),
new StringSerializer());
}
如果我使用本地 Kafka,效果很好。
但是如果我使用 Kafka TestContainers,它会在 producer.initTransactions()
:
private static final String KAFKA_VERSION = "4.1.1";
@Rule
public KafkaContainer kafka = new KafkaContainer(KAFKA_VERSION)
.withEmbeddedZookeeper();
如何配置 KafkaContainer 以处理事务?
尝试使用 Kafka for JUnit 而不是 Kafka 测试容器。我在交易方面遇到了同样的问题,并以这种方式让他们活着。
我使用的Maven依赖:
<dependency>
<groupId>net.mguenther.kafka</groupId>
<artifactId>kafka-junit</artifactId>
<version>2.1.0</version>
<scope>test</scope>
</dependency>
我在使用 Kafka for JUnit as @AntonLitvinenko suggested. My question about it
我添加了这个依赖项来修复它(参见 issue):
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>2.12.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
此外,我使用 2.0.1
版本的 kafka-junit 和 kafka_2.11:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafkaVersion}</version>
<scope>test</scope>
</dependency>