如何通过在 Kafka 集成测试中发送消息来设置应用程序状态?

How to setup an application state by sending messages in Kafka integration tests?

我想使用用于 Kafka 绑定的测试容器对我的应用程序进行集成测试。

在以下场景中,测试设置永远不会超过 waitForMessagesToBeDelivered,它会在收到消息后永远等待创建实体。在我的理解中,它应该通过 Kafka 传递两条消息,这些消息应该由 MyConsumer 接收,然后应该创建两个 MyEntitys,这应该增加 myEntityRepository.count().[=19= 返回的值]

在我为其创建此复制设置的类似场景中,接收到两条消息中的一条,并为其创建了一个实体。

@Service
public class MyConsumer {
    private final static Logger LOGGER = LoggerFactory.getLogger(MyConsumer.class);
    private final MyEntityRepository myEntityRepository;
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    public static final String TOPIC_ENTITIES = "entities";

    @Autowired
    public MyConsumer(MyEntityRepository myEntityRepository) {
        this.myEntityRepository = myEntityRepository;
    }

    @KafkaListener(topics = TOPIC_ENTITIES, groupId = "MyEntity")
    public void consume(String message) throws IOException {
        LOGGER.info(String.format("#### -> Consumed message -> %s", message));
        MyEntityCreatedMessage deserializedMessage = OBJECT_MAPPER.readValue(message, MyEntityCreatedMessage.class);
        myEntityRepository.save(new MyEntity(deserializedMessage.getUuid()));
        LOGGER.info("persisted created entity with uuid {}",
                deserializedMessage.getUuid());
    }
}

还有一个

@Configuration
public class MySpringKafkaConfiguration {

    @Bean
    public NewTopic topicEntities() {
        return new NewTopic(TOPIC_ENTITIES, 10, (short) 2);
    }
}

我想用它来测试

@ExtendWith(SpringExtension.class)
@WebAppConfiguration
@SpringBootTest(classes = {MySpringKafkaApplication.class})
@ContextConfiguration(initializers = MyConsumerIT.TestcontainersInitializer.class)
@Testcontainers
public class MyConsumerIT {
    private static final Logger LOGGER = LoggerFactory.getLogger(MyConsumerIT.class);
    private static KafkaContainer kafkaContainer = new KafkaContainer();
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private static final UUID ENTITY1_UUID = UUID.randomUUID();
    private static final UUID ENTITY2_UUID = UUID.randomUUID();

    @Autowired
    private KafkaTemplate kafkaTemplate;
    @Autowired
    private MyEntityRepository myEntityRepository;

    @BeforeAll
    public static void setUpClass() {
        kafkaContainer.start();
    }

    @AfterAll
    public static void tearDownClass() {
        kafkaContainer.stop();
    }

    @BeforeEach
    public void setUp() throws JsonProcessingException, InterruptedException {
        final String serializedMessage = OBJECT_MAPPER.writeValueAsString(new MyEntityCreatedMessage(ENTITY1_UUID));
        kafkaTemplate.send(TOPIC_ENTITIES, serializedMessage);
        final String serializedMessageAdmin = OBJECT_MAPPER.writeValueAsString(new MyEntityCreatedMessage(ENTITY2_UUID));
        kafkaTemplate.send(TOPIC_ENTITIES, serializedMessageAdmin);
        waitForMessagesToBeDelivered();
    }

    private void waitForMessagesToBeDelivered() throws InterruptedException {
        while(myEntityRepository.count() != 2) {
            LOGGER.info(String.format("userRepository.count: %d",
                    myEntityRepository.count()));
            Thread.sleep(500);
        }
    }

    @Test
    public void testSomethingWhichRequiresTwoMyEntities() {
    }

    /* default */ static class TestcontainersInitializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {

        @Override
        public void initialize(ConfigurableApplicationContext configurableApplicationContext) {
            String kafkaContainerBootstrapServers = kafkaContainer.getBootstrapServers();
            LOGGER.info("kafkaContainerBootstrapServers: {}",
                    kafkaContainerBootstrapServers);
            TestPropertyValues
                    .of("spring.kafka.consumer.bootstrap-servers=" + kafkaContainerBootstrapServers)
                    .applyTo(configurableApplicationContext.getEnvironment());
            TestPropertyValues
                    .of("spring.kafka.producer.bootstrap-servers=" + kafkaContainerBootstrapServers)
                    .applyTo(configurableApplicationContext.getEnvironment());
        }
    }
}

logs 我看到了

2019-11-15 17:01:44.575 ERROR 30056 --- [           main] o.springframework.kafka.core.KafkaAdmin  : Could not configure topics

org.springframework.kafka.KafkaException: Timed out waiting to get existing topics; nested exception is java.util.concurrent.TimeoutException

但是似乎连接成功了,不过我不确定。

我知道嵌入式 kafka 等替代方案,但我想使用 testcontainers 以提高集成的真实性。

您在发送消息和消费者开始之间进行竞赛。默认情况下,新的 Kafka 消费者从主题的末尾开始消费。

添加:

spring.kafka.consumer.auto-offset-reset=earliest

将确保消费者获得主题中的任何现有记录。