Spring Kafka Embedded - 主题在测试之间已经存在
Spring Kafka Embedded - topic already exists between tests
我创建了一组带有嵌入式 kafka (spring-kafka-test) 的测试 (JUnit 5),当我 运行 他们有时(不总是)我得到“主题 'some_name' 已存在 " 在单个 运行.
中的一项或多项测试中
所有测试都使用相同的主题名称(我不想为每个测试更改该名称),测试 class 具有 DirtiesContext 注释 (AFTER_EACH_TEST_METHOD)。我不确定这个问题的原因是什么,以及如何解决它。
@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"some_name"}, ports = 9092)
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@ActiveProfiles("test")
public class RemovalKafkaTestIT {
private EmbeddedKafkaBroker embeddedKafkaBroker = new EmbeddedKafkaBroker(1, true, TOPIC);
private final static String SERVER_ADDRES = "127.0.0.1:9092";
private Consumer<String, String> prepareConsumer() {
Map<String, Object> configsConsumer = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
configsConsumer.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configsConsumer.put("bootstrap.servers", SERVER_ADDRES);
Consumer<String, String> consumer = new DefaultKafkaConsumerFactory<>(configsConsumer, new StringDeserializer(), new StringDeserializer()).createConsumer();
consumer.subscribe(singleton("some_name"));
return consumer;
}
@Test
public void someMethodWithKafka1() {
// some logic
...
// check topic content
Consumer<String, String> consumer = this.prepareConsumer();
embeddedKafkaBroker.consumeFromEmbeddedTopics(consumer, "some_name");
ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer);
assertThat(records.count()).isEqualTo(1); // and other checks :)
// clean
consumer.commitSync();
consumer.close();
}
@Test
public void someMethodWithKafka2() {
// some other logic
...
// check topic content
Consumer<String, String> consumer = this.prepareConsumer();
embeddedKafkaBroker.consumeFromEmbeddedTopics(consumer, "some_name");
ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer);
assertThat(records.count()).isEqualTo(1); // and other checks :)
// clean
consumer.commitSync();
consumer.close();
}
}
尝试将您的 EmbeddedKafkaBroker
标记为 bean 或使用 @Autowire
从顶级注释创建代理。
因为代理未标记为 bean,所以它的生命周期不会由应用程序上下文管理,并且不会在 @DirtiesContext
的运行之间被清除。也许它坚持让话题跨越测试边界的东西。
您有两个经纪人;您自己创建的:
private EmbeddedKafkaBroker embeddedKafkaBroker = new EmbeddedKafkaBroker(1, true, TOPIC);
还有一个由 Spring 管理:
@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"some_name"}, ports = 9092)
当您将 @EmbeddedKafka
与 Spring 测试上下文一起使用时;代理已添加到上下文中。
改为
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
并且不要添加另一个 bean。
通常,为每个测试使用不同的主题更容易(也更快);避免为每个测试创建代理。
编辑
ports = 9092
改为使用随机端口(省略此配置)并使用
configsConsumer.put("bootstrap.servers", this.embeddedKafkaBroker.getBrokersAsString());
我创建了一组带有嵌入式 kafka (spring-kafka-test) 的测试 (JUnit 5),当我 运行 他们有时(不总是)我得到“主题 'some_name' 已存在 " 在单个 运行.
中的一项或多项测试中所有测试都使用相同的主题名称(我不想为每个测试更改该名称),测试 class 具有 DirtiesContext 注释 (AFTER_EACH_TEST_METHOD)。我不确定这个问题的原因是什么,以及如何解决它。
@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"some_name"}, ports = 9092)
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@ActiveProfiles("test")
public class RemovalKafkaTestIT {
private EmbeddedKafkaBroker embeddedKafkaBroker = new EmbeddedKafkaBroker(1, true, TOPIC);
private final static String SERVER_ADDRES = "127.0.0.1:9092";
private Consumer<String, String> prepareConsumer() {
Map<String, Object> configsConsumer = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
configsConsumer.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configsConsumer.put("bootstrap.servers", SERVER_ADDRES);
Consumer<String, String> consumer = new DefaultKafkaConsumerFactory<>(configsConsumer, new StringDeserializer(), new StringDeserializer()).createConsumer();
consumer.subscribe(singleton("some_name"));
return consumer;
}
@Test
public void someMethodWithKafka1() {
// some logic
...
// check topic content
Consumer<String, String> consumer = this.prepareConsumer();
embeddedKafkaBroker.consumeFromEmbeddedTopics(consumer, "some_name");
ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer);
assertThat(records.count()).isEqualTo(1); // and other checks :)
// clean
consumer.commitSync();
consumer.close();
}
@Test
public void someMethodWithKafka2() {
// some other logic
...
// check topic content
Consumer<String, String> consumer = this.prepareConsumer();
embeddedKafkaBroker.consumeFromEmbeddedTopics(consumer, "some_name");
ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer);
assertThat(records.count()).isEqualTo(1); // and other checks :)
// clean
consumer.commitSync();
consumer.close();
}
}
尝试将您的 EmbeddedKafkaBroker
标记为 bean 或使用 @Autowire
从顶级注释创建代理。
因为代理未标记为 bean,所以它的生命周期不会由应用程序上下文管理,并且不会在 @DirtiesContext
的运行之间被清除。也许它坚持让话题跨越测试边界的东西。
您有两个经纪人;您自己创建的:
private EmbeddedKafkaBroker embeddedKafkaBroker = new EmbeddedKafkaBroker(1, true, TOPIC);
还有一个由 Spring 管理:
@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"some_name"}, ports = 9092)
当您将 @EmbeddedKafka
与 Spring 测试上下文一起使用时;代理已添加到上下文中。
改为
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
并且不要添加另一个 bean。
通常,为每个测试使用不同的主题更容易(也更快);避免为每个测试创建代理。
编辑
ports = 9092
改为使用随机端口(省略此配置)并使用
configsConsumer.put("bootstrap.servers", this.embeddedKafkaBroker.getBrokersAsString());