如何在测试期间启动嵌入式 kafka 集群?

How do I spin up an embedded kafka cluster during my test?

为了测试我们的 spring 引导应用程序在 kafka 集群尚未启动时如何处理它,我想在应用程序启动后的某个时间在 junit 测试中启动一个嵌入式 kafka 集群。我怎么能解决这个问题? 据我了解,spring-kafka-test 的 @EmbeddedKafka 在创建 SpringBootTest 的应用程序上下文之前启动集群。有什么方法可以配置该时间吗?

当定义为 bean(通过 @EmbeddedKafka)或作为 JUnit 条件(再次通过 @EmbeddedKafka - 当没有测试 Spring ApplicationContext 时),代理启动于afterPropertiesSet().

您应该能够手动创建代理并在准备就绪时调用 afterPropertiesSet()

这是来自 JUnit5 的代码 EmbeddedkafkaCondition:

@SuppressWarnings("unchecked")
private EmbeddedKafkaBroker createBroker(EmbeddedKafka embedded) {
    EmbeddedKafkaBroker broker;
    int[] ports = setupPorts(embedded);
    broker = new EmbeddedKafkaBroker(embedded.count(), embedded.controlledShutdown(),
                    embedded.partitions(), embedded.topics())
            .zkPort(embedded.zookeeperPort())
            .kafkaPorts(ports)
            .zkConnectionTimeout(embedded.zkConnectionTimeout())
            .zkSessionTimeout(embedded.zkSessionTimeout());
    Properties properties = new Properties();

    for (String pair : embedded.brokerProperties()) {
        if (!StringUtils.hasText(pair)) {
            continue;
        }
        try {
            properties.load(new StringReader(pair));
        }
        catch (Exception ex) {
            throw new IllegalStateException("Failed to load broker property from [" + pair + "]",
                    ex);
        }
    }
    if (StringUtils.hasText(embedded.brokerPropertiesLocation())) {
        Resource propertiesResource = new PathMatchingResourcePatternResolver()
                .getResource(embedded.brokerPropertiesLocation());
        if (!propertiesResource.exists()) {
            throw new IllegalStateException(
                    "Failed to load broker properties from [" + propertiesResource
                            + "]: resource does not exist.");
        }
        try (InputStream in = propertiesResource.getInputStream()) {
            Properties p = new Properties();
            p.load(in);
            p.forEach(properties::putIfAbsent);
        }
        catch (IOException ex) {
            throw new IllegalStateException(
                    "Failed to load broker properties from [" + propertiesResource + "]", ex);
        }
    }
    broker.brokerProperties((Map<String, String>) (Map<?, ?>) properties);
    if (StringUtils.hasText(embedded.bootstrapServersProperty())) {
        broker.brokerListProperty(embedded.bootstrapServersProperty());
    }
    broker.afterPropertiesSet();
    return broker;
}