如何通过在 Kafka 集成测试中发送消息来设置应用程序状态?
How to setup an application state by sending messages in Kafka integration tests?
我想使用用于 Kafka 绑定的测试容器对我的应用程序进行集成测试。
在以下场景中,测试设置永远不会超过 waitForMessagesToBeDelivered
,它会在收到消息后永远等待创建实体。在我的理解中,它应该通过 Kafka 传递两条消息,这些消息应该由 MyConsumer
接收,然后应该创建两个 MyEntity
s,这应该增加 myEntityRepository.count()
.[=19= 返回的值]
在我为其创建此复制设置的类似场景中,接收到两条消息中的一条,并为其创建了一个实体。
@Service
public class MyConsumer {
private final static Logger LOGGER = LoggerFactory.getLogger(MyConsumer.class);
private final MyEntityRepository myEntityRepository;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static final String TOPIC_ENTITIES = "entities";
@Autowired
public MyConsumer(MyEntityRepository myEntityRepository) {
this.myEntityRepository = myEntityRepository;
}
@KafkaListener(topics = TOPIC_ENTITIES, groupId = "MyEntity")
public void consume(String message) throws IOException {
LOGGER.info(String.format("#### -> Consumed message -> %s", message));
MyEntityCreatedMessage deserializedMessage = OBJECT_MAPPER.readValue(message, MyEntityCreatedMessage.class);
myEntityRepository.save(new MyEntity(deserializedMessage.getUuid()));
LOGGER.info("persisted created entity with uuid {}",
deserializedMessage.getUuid());
}
}
还有一个
@Configuration
public class MySpringKafkaConfiguration {
@Bean
public NewTopic topicEntities() {
return new NewTopic(TOPIC_ENTITIES, 10, (short) 2);
}
}
我想用它来测试
@ExtendWith(SpringExtension.class)
@WebAppConfiguration
@SpringBootTest(classes = {MySpringKafkaApplication.class})
@ContextConfiguration(initializers = MyConsumerIT.TestcontainersInitializer.class)
@Testcontainers
public class MyConsumerIT {
private static final Logger LOGGER = LoggerFactory.getLogger(MyConsumerIT.class);
private static KafkaContainer kafkaContainer = new KafkaContainer();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final UUID ENTITY1_UUID = UUID.randomUUID();
private static final UUID ENTITY2_UUID = UUID.randomUUID();
@Autowired
private KafkaTemplate kafkaTemplate;
@Autowired
private MyEntityRepository myEntityRepository;
@BeforeAll
public static void setUpClass() {
kafkaContainer.start();
}
@AfterAll
public static void tearDownClass() {
kafkaContainer.stop();
}
@BeforeEach
public void setUp() throws JsonProcessingException, InterruptedException {
final String serializedMessage = OBJECT_MAPPER.writeValueAsString(new MyEntityCreatedMessage(ENTITY1_UUID));
kafkaTemplate.send(TOPIC_ENTITIES, serializedMessage);
final String serializedMessageAdmin = OBJECT_MAPPER.writeValueAsString(new MyEntityCreatedMessage(ENTITY2_UUID));
kafkaTemplate.send(TOPIC_ENTITIES, serializedMessageAdmin);
waitForMessagesToBeDelivered();
}
private void waitForMessagesToBeDelivered() throws InterruptedException {
while(myEntityRepository.count() != 2) {
LOGGER.info(String.format("userRepository.count: %d",
myEntityRepository.count()));
Thread.sleep(500);
}
}
@Test
public void testSomethingWhichRequiresTwoMyEntities() {
}
/* default */ static class TestcontainersInitializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
@Override
public void initialize(ConfigurableApplicationContext configurableApplicationContext) {
String kafkaContainerBootstrapServers = kafkaContainer.getBootstrapServers();
LOGGER.info("kafkaContainerBootstrapServers: {}",
kafkaContainerBootstrapServers);
TestPropertyValues
.of("spring.kafka.consumer.bootstrap-servers=" + kafkaContainerBootstrapServers)
.applyTo(configurableApplicationContext.getEnvironment());
TestPropertyValues
.of("spring.kafka.producer.bootstrap-servers=" + kafkaContainerBootstrapServers)
.applyTo(configurableApplicationContext.getEnvironment());
}
}
}
在 logs 我看到了
2019-11-15 17:01:44.575 ERROR 30056 --- [ main] o.springframework.kafka.core.KafkaAdmin : Could not configure topics
org.springframework.kafka.KafkaException: Timed out waiting to get existing topics; nested exception is java.util.concurrent.TimeoutException
但是似乎连接成功了,不过我不确定。
我知道嵌入式 kafka 等替代方案,但我想使用 testcontainers 以提高集成的真实性。
您在发送消息和消费者开始之间进行竞赛。默认情况下,新的 Kafka 消费者从主题的末尾开始消费。
添加:
spring.kafka.consumer.auto-offset-reset=earliest
将确保消费者获得主题中的任何现有记录。
我想使用用于 Kafka 绑定的测试容器对我的应用程序进行集成测试。
在以下场景中,测试设置永远不会超过 waitForMessagesToBeDelivered
,它会在收到消息后永远等待创建实体。在我的理解中,它应该通过 Kafka 传递两条消息,这些消息应该由 MyConsumer
接收,然后应该创建两个 MyEntity
s,这应该增加 myEntityRepository.count()
.[=19= 返回的值]
在我为其创建此复制设置的类似场景中,接收到两条消息中的一条,并为其创建了一个实体。
@Service
public class MyConsumer {
private final static Logger LOGGER = LoggerFactory.getLogger(MyConsumer.class);
private final MyEntityRepository myEntityRepository;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static final String TOPIC_ENTITIES = "entities";
@Autowired
public MyConsumer(MyEntityRepository myEntityRepository) {
this.myEntityRepository = myEntityRepository;
}
@KafkaListener(topics = TOPIC_ENTITIES, groupId = "MyEntity")
public void consume(String message) throws IOException {
LOGGER.info(String.format("#### -> Consumed message -> %s", message));
MyEntityCreatedMessage deserializedMessage = OBJECT_MAPPER.readValue(message, MyEntityCreatedMessage.class);
myEntityRepository.save(new MyEntity(deserializedMessage.getUuid()));
LOGGER.info("persisted created entity with uuid {}",
deserializedMessage.getUuid());
}
}
还有一个
@Configuration
public class MySpringKafkaConfiguration {
@Bean
public NewTopic topicEntities() {
return new NewTopic(TOPIC_ENTITIES, 10, (short) 2);
}
}
我想用它来测试
@ExtendWith(SpringExtension.class)
@WebAppConfiguration
@SpringBootTest(classes = {MySpringKafkaApplication.class})
@ContextConfiguration(initializers = MyConsumerIT.TestcontainersInitializer.class)
@Testcontainers
public class MyConsumerIT {
private static final Logger LOGGER = LoggerFactory.getLogger(MyConsumerIT.class);
private static KafkaContainer kafkaContainer = new KafkaContainer();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final UUID ENTITY1_UUID = UUID.randomUUID();
private static final UUID ENTITY2_UUID = UUID.randomUUID();
@Autowired
private KafkaTemplate kafkaTemplate;
@Autowired
private MyEntityRepository myEntityRepository;
@BeforeAll
public static void setUpClass() {
kafkaContainer.start();
}
@AfterAll
public static void tearDownClass() {
kafkaContainer.stop();
}
@BeforeEach
public void setUp() throws JsonProcessingException, InterruptedException {
final String serializedMessage = OBJECT_MAPPER.writeValueAsString(new MyEntityCreatedMessage(ENTITY1_UUID));
kafkaTemplate.send(TOPIC_ENTITIES, serializedMessage);
final String serializedMessageAdmin = OBJECT_MAPPER.writeValueAsString(new MyEntityCreatedMessage(ENTITY2_UUID));
kafkaTemplate.send(TOPIC_ENTITIES, serializedMessageAdmin);
waitForMessagesToBeDelivered();
}
private void waitForMessagesToBeDelivered() throws InterruptedException {
while(myEntityRepository.count() != 2) {
LOGGER.info(String.format("userRepository.count: %d",
myEntityRepository.count()));
Thread.sleep(500);
}
}
@Test
public void testSomethingWhichRequiresTwoMyEntities() {
}
/* default */ static class TestcontainersInitializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
@Override
public void initialize(ConfigurableApplicationContext configurableApplicationContext) {
String kafkaContainerBootstrapServers = kafkaContainer.getBootstrapServers();
LOGGER.info("kafkaContainerBootstrapServers: {}",
kafkaContainerBootstrapServers);
TestPropertyValues
.of("spring.kafka.consumer.bootstrap-servers=" + kafkaContainerBootstrapServers)
.applyTo(configurableApplicationContext.getEnvironment());
TestPropertyValues
.of("spring.kafka.producer.bootstrap-servers=" + kafkaContainerBootstrapServers)
.applyTo(configurableApplicationContext.getEnvironment());
}
}
}
在 logs 我看到了
2019-11-15 17:01:44.575 ERROR 30056 --- [ main] o.springframework.kafka.core.KafkaAdmin : Could not configure topics
org.springframework.kafka.KafkaException: Timed out waiting to get existing topics; nested exception is java.util.concurrent.TimeoutException
但是似乎连接成功了,不过我不确定。
我知道嵌入式 kafka 等替代方案,但我想使用 testcontainers 以提高集成的真实性。
您在发送消息和消费者开始之间进行竞赛。默认情况下,新的 Kafka 消费者从主题的末尾开始消费。
添加:
spring.kafka.consumer.auto-offset-reset=earliest
将确保消费者获得主题中的任何现有记录。