嵌入式 Kafka 测试随机失败
Embedded Kafka tests randomly failing
我使用 EmbededKafka 实施了一系列集成测试,以使用 spring-kafka 框架测试我们的一个 Kafka 流应用程序 运行ning。
流应用程序正在从 Kafka 主题读取消息,它将其存储到内部状态存储中,进行一些 t运行 转换并将其发送到另一个微服务到请求的主题中。当响应返回响应的主题时,它会从状态存储中检索原始消息,并根据某些业务逻辑将其转发到我们的下游系统之一,每个下游系统都有自己的主题。
集成测试只是对业务条件的各种排列进行测试。
最初测试分为 classes。 运行构建时,来自一个 class 的测试与另一个 class 中的测试发生冲突,但有一些冲突例外。我没有在这上面花太多时间,只是将所有测试移到同一个 class 中。这解决了我的所有测试通过 gradle build 或 intelij EDI 的问题。
这是测试:
package au.nab.tlm.streams.integration;
import au.nab.tlm.streams.serde.EntitlementsCheckSerDes;
import au.nab.tlm.streams.test.support.MockEntitlementsCheckSerDes;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
@SpringBootTest
@ContextConfiguration(classes = {MyTopologiesIntegrationTest.TestKafkaConfig.class})
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
@EmbeddedKafka(
ports = 9092,
partitions = 1,
topics = {
"topic-1.v1",
"topic-2.v1",
"topic-3.v1",
"topic-4.v1",
"topic-5.v1",
"topic-6.v1",
},
brokerProperties = {"transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1", "log.dir=/tmp/embedded-kafka"}
)
public class MyTopologiesIntegrationTest {
@Autowired
EmbeddedKafkaBroker kafkaBroker;
@Autowired
EntitlementsCheckSerDes appSerDes;
@Test
public void test_1() {
}
@Test
public void test_2() {
}
@Test
public void test_3() {
}
@Test
public void test_4() {
}
@Test
public void test_5() {
}
@TestConfiguration
public static class TestKafkaConfig {
@Bean
EntitlementsCheckSerDes appSerDes() {
return new MockEntitlementsCheckSerDes();
}
}
}
对结果很满意,我推动了我的更改,只是为了注意到构建在我们的 CI 服务器上失败了。 运行 再次失败,这次失败与第一次不同。我让一位同事看了一下,他遇到了与 CI 服务器类似的相同故障经历。我 运行 在我的机器上至少构建了二十次,它总是通过。 运行 对我同事的一项一项测试也总是通过。
我们遇到的最常见的异常是主题 xyz 已经存在,但偶尔会出现一些其他异常,表明集群不能喜欢或相似。所有这些异常都向我们表明,尽管使用 DirtiesContext
annotation.The 的第一个测试 运行 是,但在下一次测试开始之前,先前测试中使用的嵌入式 Kafka 并未完全关闭总是过去。
我们都花了一整天的时间把头发拉下来,根本没办法让它发挥作用。我们尝试了 google 带我们去的任何地方,一点运气也没有。最后我们在测试中留下了唯一的一个测试场景(交互次数最多的那个)class 并禁用了其余的。
显然这不是一个可以接受的永久解决方案,我真的很想了解我们做错了什么以及如何解决它。
提前感谢您的投入。
不要使用固定端口ports = 9092,
- 默认情况下,嵌入式 kafka 将侦听操作系统选择的随机端口。
您应该将其用于您的测试用例。
您可以通过调用 this.kafkaBroker.getBrokersAsString()
获取代理地址。
我使用 EmbededKafka 实施了一系列集成测试,以使用 spring-kafka 框架测试我们的一个 Kafka 流应用程序 运行ning。
流应用程序正在从 Kafka 主题读取消息,它将其存储到内部状态存储中,进行一些 t运行 转换并将其发送到另一个微服务到请求的主题中。当响应返回响应的主题时,它会从状态存储中检索原始消息,并根据某些业务逻辑将其转发到我们的下游系统之一,每个下游系统都有自己的主题。
集成测试只是对业务条件的各种排列进行测试。
最初测试分为 classes。 运行构建时,来自一个 class 的测试与另一个 class 中的测试发生冲突,但有一些冲突例外。我没有在这上面花太多时间,只是将所有测试移到同一个 class 中。这解决了我的所有测试通过 gradle build 或 intelij EDI 的问题。
这是测试:
package au.nab.tlm.streams.integration;
import au.nab.tlm.streams.serde.EntitlementsCheckSerDes;
import au.nab.tlm.streams.test.support.MockEntitlementsCheckSerDes;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
@SpringBootTest
@ContextConfiguration(classes = {MyTopologiesIntegrationTest.TestKafkaConfig.class})
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
@EmbeddedKafka(
ports = 9092,
partitions = 1,
topics = {
"topic-1.v1",
"topic-2.v1",
"topic-3.v1",
"topic-4.v1",
"topic-5.v1",
"topic-6.v1",
},
brokerProperties = {"transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1", "log.dir=/tmp/embedded-kafka"}
)
public class MyTopologiesIntegrationTest {
@Autowired
EmbeddedKafkaBroker kafkaBroker;
@Autowired
EntitlementsCheckSerDes appSerDes;
@Test
public void test_1() {
}
@Test
public void test_2() {
}
@Test
public void test_3() {
}
@Test
public void test_4() {
}
@Test
public void test_5() {
}
@TestConfiguration
public static class TestKafkaConfig {
@Bean
EntitlementsCheckSerDes appSerDes() {
return new MockEntitlementsCheckSerDes();
}
}
}
对结果很满意,我推动了我的更改,只是为了注意到构建在我们的 CI 服务器上失败了。 运行 再次失败,这次失败与第一次不同。我让一位同事看了一下,他遇到了与 CI 服务器类似的相同故障经历。我 运行 在我的机器上至少构建了二十次,它总是通过。 运行 对我同事的一项一项测试也总是通过。
我们遇到的最常见的异常是主题 xyz 已经存在,但偶尔会出现一些其他异常,表明集群不能喜欢或相似。所有这些异常都向我们表明,尽管使用 DirtiesContext
annotation.The 的第一个测试 运行 是,但在下一次测试开始之前,先前测试中使用的嵌入式 Kafka 并未完全关闭总是过去。
我们都花了一整天的时间把头发拉下来,根本没办法让它发挥作用。我们尝试了 google 带我们去的任何地方,一点运气也没有。最后我们在测试中留下了唯一的一个测试场景(交互次数最多的那个)class 并禁用了其余的。
显然这不是一个可以接受的永久解决方案,我真的很想了解我们做错了什么以及如何解决它。
提前感谢您的投入。
不要使用固定端口ports = 9092,
- 默认情况下,嵌入式 kafka 将侦听操作系统选择的随机端口。
您应该将其用于您的测试用例。
您可以通过调用 this.kafkaBroker.getBrokersAsString()
获取代理地址。