如何在测试期间启动嵌入式 kafka 集群?
How do I spin up an embedded kafka cluster during my test?
为了测试我们的 spring 引导应用程序在 kafka 集群尚未启动时如何处理它,我想在应用程序启动后的某个时间在 junit 测试中启动一个嵌入式 kafka 集群。我怎么能解决这个问题?
据我了解,spring-kafka-test 的 @EmbeddedKafka
在创建 SpringBootTest 的应用程序上下文之前启动集群。有什么方法可以配置该时间吗?
当定义为 bean(通过 @EmbeddedKafka
)或作为 JUnit 条件(再次通过 @EmbeddedKafka
- 当没有测试 Spring ApplicationContext 时),代理启动于afterPropertiesSet()
.
您应该能够手动创建代理并在准备就绪时调用 afterPropertiesSet()
。
这是来自 JUnit5 的代码 EmbeddedkafkaCondition
:
@SuppressWarnings("unchecked")
private EmbeddedKafkaBroker createBroker(EmbeddedKafka embedded) {
EmbeddedKafkaBroker broker;
int[] ports = setupPorts(embedded);
broker = new EmbeddedKafkaBroker(embedded.count(), embedded.controlledShutdown(),
embedded.partitions(), embedded.topics())
.zkPort(embedded.zookeeperPort())
.kafkaPorts(ports)
.zkConnectionTimeout(embedded.zkConnectionTimeout())
.zkSessionTimeout(embedded.zkSessionTimeout());
Properties properties = new Properties();
for (String pair : embedded.brokerProperties()) {
if (!StringUtils.hasText(pair)) {
continue;
}
try {
properties.load(new StringReader(pair));
}
catch (Exception ex) {
throw new IllegalStateException("Failed to load broker property from [" + pair + "]",
ex);
}
}
if (StringUtils.hasText(embedded.brokerPropertiesLocation())) {
Resource propertiesResource = new PathMatchingResourcePatternResolver()
.getResource(embedded.brokerPropertiesLocation());
if (!propertiesResource.exists()) {
throw new IllegalStateException(
"Failed to load broker properties from [" + propertiesResource
+ "]: resource does not exist.");
}
try (InputStream in = propertiesResource.getInputStream()) {
Properties p = new Properties();
p.load(in);
p.forEach(properties::putIfAbsent);
}
catch (IOException ex) {
throw new IllegalStateException(
"Failed to load broker properties from [" + propertiesResource + "]", ex);
}
}
broker.brokerProperties((Map<String, String>) (Map<?, ?>) properties);
if (StringUtils.hasText(embedded.bootstrapServersProperty())) {
broker.brokerListProperty(embedded.bootstrapServersProperty());
}
broker.afterPropertiesSet();
return broker;
}
为了测试我们的 spring 引导应用程序在 kafka 集群尚未启动时如何处理它,我想在应用程序启动后的某个时间在 junit 测试中启动一个嵌入式 kafka 集群。我怎么能解决这个问题?
据我了解,spring-kafka-test 的 @EmbeddedKafka
在创建 SpringBootTest 的应用程序上下文之前启动集群。有什么方法可以配置该时间吗?
当定义为 bean(通过 @EmbeddedKafka
)或作为 JUnit 条件(再次通过 @EmbeddedKafka
- 当没有测试 Spring ApplicationContext 时),代理启动于afterPropertiesSet()
.
您应该能够手动创建代理并在准备就绪时调用 afterPropertiesSet()
。
这是来自 JUnit5 的代码 EmbeddedkafkaCondition
:
@SuppressWarnings("unchecked")
private EmbeddedKafkaBroker createBroker(EmbeddedKafka embedded) {
EmbeddedKafkaBroker broker;
int[] ports = setupPorts(embedded);
broker = new EmbeddedKafkaBroker(embedded.count(), embedded.controlledShutdown(),
embedded.partitions(), embedded.topics())
.zkPort(embedded.zookeeperPort())
.kafkaPorts(ports)
.zkConnectionTimeout(embedded.zkConnectionTimeout())
.zkSessionTimeout(embedded.zkSessionTimeout());
Properties properties = new Properties();
for (String pair : embedded.brokerProperties()) {
if (!StringUtils.hasText(pair)) {
continue;
}
try {
properties.load(new StringReader(pair));
}
catch (Exception ex) {
throw new IllegalStateException("Failed to load broker property from [" + pair + "]",
ex);
}
}
if (StringUtils.hasText(embedded.brokerPropertiesLocation())) {
Resource propertiesResource = new PathMatchingResourcePatternResolver()
.getResource(embedded.brokerPropertiesLocation());
if (!propertiesResource.exists()) {
throw new IllegalStateException(
"Failed to load broker properties from [" + propertiesResource
+ "]: resource does not exist.");
}
try (InputStream in = propertiesResource.getInputStream()) {
Properties p = new Properties();
p.load(in);
p.forEach(properties::putIfAbsent);
}
catch (IOException ex) {
throw new IllegalStateException(
"Failed to load broker properties from [" + propertiesResource + "]", ex);
}
}
broker.brokerProperties((Map<String, String>) (Map<?, ?>) properties);
if (StringUtils.hasText(embedded.bootstrapServersProperty())) {
broker.brokerListProperty(embedded.bootstrapServersProperty());
}
broker.afterPropertiesSet();
return broker;
}