如何使用 spring-boot EmbeddedKafka 对 KStream 拓扑进行集成测试?
How to do integration testing of KStream topology using spring-boot EmbeddedKafka?
我有一个简单的 spring-boot KStream 拓扑,可以将字符串从小写转换为大写。我希望我的集成测试启动嵌入式 kafka,然后测试拓扑。我想知道是否可以使用 spring @EmbeddedKafka
?
编写这样的集成测试
我见过几个使用 @EmbeddedKafka
的例子,简单的消费者使用 @KafkaListener
,但没有看到任何使用 KStream 的例子。
我尝试测试以下拓扑以将传入文本流从小写转换为大写。
拓扑结构如下:
@Configuration
public class UppercaseStream {
private static final String LOWERCASE_TOPIC = "t.lower.case";
private static final String UPPERCASE_TOPIC = "t.upper.case";
@Bean
@Qualifier("kStreamPromoToUppercase")
public KStream<String, String> kStreamPromoToUppercase(StreamsBuilder builder) {
KStream<String, String> sourceStream = builder
.stream(LOWERCASE_TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
sourceStream.print(Printed.<String, String>toSysOut().withLabel("Original KStream..."));
KStream<String, String> upperCaseStream = sourceStream.mapValues(text -> text.toUpperCase());
upperCaseStream.print(Printed.<String, String>toSysOut().withLabel("Uppercase KStream..."));
upperCaseStream.to(UPPERCASE_TOPIC);
return upperCaseStream;
}
}
测试拓扑的单元测试是:
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class UpperCaseTopologyTest {
TopologyTestDriver testDriver;
@AfterAll
void tearDown(){
testDriver.close();
}
@Test
@DisplayName("should transform lowercase to uppercase words")
void shouldTransformLowercaseWords() {
//Given
StreamsBuilder builder = new StreamsBuilder();
new UppercaseStream().kStreamPromoToUppercase(builder);
Topology topology = builder.build();
// setup test driver
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
//Create a Topology Test Driver
testDriver = new TopologyTestDriver(topology, props);
TestInputTopic<String, String> inputTopic = testDriver.createInputTopic("t.lower.case", new Serdes.StringSerde().serializer(), new Serdes.StringSerde().serializer());
TestOutputTopic<String, String> outputTopic = testDriver.createOutputTopic("t.upper.case", new Serdes.StringSerde().deserializer(), new Serdes.StringSerde().deserializer());
//When
inputTopic.pipeInput("test");
//Then
assertThat(outputTopic.readValue()).isEqualTo("TEST");
}
}
我想编写一个集成测试,首先启动一个嵌入式 kafka 服务器,然后测试 UppercaseStream 拓扑。
我尝试了以下方法:
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class EmbeddedKafkaIntegrationTest {
@Autowired
public KafkaTemplate<String, String> template;
@Autowired
private KafkaConsumer consumer;
private KafkaStreams kafkaStreams;
@Value("${test.topic}")
private String topic;
@Autowired
private KafkaStreamsConfiguration kafkaStreamsConfiguration;
@Test
public void should_transform_lowercase_to_uppercase() throws Exception {
//Create a StreamsBuilder
StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.stream(topic, Consumed.with(new Serdes.StringSerde(), new Serdes.StringSerde()));
//Add a topology
new UppercaseStream().kStreamPromoToUppercase(streamsBuilder);
kafkaStreams = new KafkaStreams(streamsBuilder.build(), kafkaStreamsConfiguration.asProperties());
kafkaStreams.start();
template.send(topic, "test");
consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
assertThat(consumer.getLatch().getCount(), equalTo(0L));
assertThat(consumer.getPayload(), containsString("TEST"));
}
@After
public void tearDown() {
if (kafkaStreams!= null) kafkaStreams.close();
}
}
测试未通过断言。我不确定如何获取 kStreamPromoToUppercase bean。我不确定我是否在尝试遵循正确的方法。
不清楚你的KafkaConsumer
是什么;大概是 Consumer<K, V>
.
周围的一些包装器
也许您的消费者没有auto.offset.reset=earliest
。这是此类测试的常见错误,因为在发送记录后消费者可能会开始比赛;默认是 latest
所以你不会得到这样的记录。
该框架有许多使用嵌入式 kafka 代理的 Kafka Streams 测试用例。
集成测试中缺少一些东西。
需要一对 NewTopic
kafka 客户端管理对象来表示输入和输出主题
@Bean public NewTopic createInputTopic() { return new NewTopic(inputTopic,Optional.of(1), Optional.empty()); }
另一个是输出主题
@Bean public NewTopic createOutputTopic() { return new NewTopic(outputTopic,Optional.of(1), Optional.empty()); }
测试的其余部分大致相同。正如@Garry 所建议的,我使用了 kafka 消费者。
@SpringBootTest
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class KStreamSampleApplicationTests {
private final KafkaProperties kafkaProperties;
private final String inputTopic;
private final String outputTopic;
@Autowired
public KStreamSampleApplicationTests(KafkaProperties kafkaProperties, Environment env) {
this.kafkaProperties = kafkaProperties;
this.inputTopic = env.getProperty("spring.kafka.input-lowercase-topic");
this.outputTopic = env.getProperty("spring.kafka.output-uppercase-topic");
}
@Test
@DisplayName("should test uppercaseStream topology")
void shouldTestUppercaseStreamTopology() {
//Given
Map<String, Object> producerProps = new HashMap<>(KafkaTestUtils.producerProps(
String.join(",", kafkaProperties.getBootstrapServers())));
//Create a kafka producer
Producer<String, String> producer = new DefaultKafkaProducerFactory<>(producerProps, new StringSerializer(), new StringSerializer()).createProducer();
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(String.join(",", kafkaProperties.getBootstrapServers()), "testGroup", "true");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//Create a Consumer client
Consumer<String, String> consumer = new DefaultKafkaConsumerFactory<>(consumerProps, new StringDeserializer(), new StringDeserializer()).createConsumer();
consumer.subscribe(Collections.singleton(outputTopic));
//When
producer.send(new ProducerRecord<>(inputTopic, "test"));
producer.flush();
//Then
assertThat(producer).isNotNull();
//And
ConsumerRecords<String, String> rec = consumer.poll(Duration.ofSeconds(3));
Iterable<ConsumerRecord<String, String>> records = rec.records(outputTopic);
Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
if (!iterator.hasNext()) Assertions.fail();
ConsumerRecord<String, String> next = iterator.next();
assertThat(next.value()).isEqualTo("TEST");
}
}
这是完整重构解决方案的 gist。
我有一个简单的 spring-boot KStream 拓扑,可以将字符串从小写转换为大写。我希望我的集成测试启动嵌入式 kafka,然后测试拓扑。我想知道是否可以使用 spring @EmbeddedKafka
?
我见过几个使用 @EmbeddedKafka
的例子,简单的消费者使用 @KafkaListener
,但没有看到任何使用 KStream 的例子。
我尝试测试以下拓扑以将传入文本流从小写转换为大写。
拓扑结构如下:
@Configuration
public class UppercaseStream {
private static final String LOWERCASE_TOPIC = "t.lower.case";
private static final String UPPERCASE_TOPIC = "t.upper.case";
@Bean
@Qualifier("kStreamPromoToUppercase")
public KStream<String, String> kStreamPromoToUppercase(StreamsBuilder builder) {
KStream<String, String> sourceStream = builder
.stream(LOWERCASE_TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
sourceStream.print(Printed.<String, String>toSysOut().withLabel("Original KStream..."));
KStream<String, String> upperCaseStream = sourceStream.mapValues(text -> text.toUpperCase());
upperCaseStream.print(Printed.<String, String>toSysOut().withLabel("Uppercase KStream..."));
upperCaseStream.to(UPPERCASE_TOPIC);
return upperCaseStream;
}
}
测试拓扑的单元测试是:
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class UpperCaseTopologyTest {
TopologyTestDriver testDriver;
@AfterAll
void tearDown(){
testDriver.close();
}
@Test
@DisplayName("should transform lowercase to uppercase words")
void shouldTransformLowercaseWords() {
//Given
StreamsBuilder builder = new StreamsBuilder();
new UppercaseStream().kStreamPromoToUppercase(builder);
Topology topology = builder.build();
// setup test driver
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
//Create a Topology Test Driver
testDriver = new TopologyTestDriver(topology, props);
TestInputTopic<String, String> inputTopic = testDriver.createInputTopic("t.lower.case", new Serdes.StringSerde().serializer(), new Serdes.StringSerde().serializer());
TestOutputTopic<String, String> outputTopic = testDriver.createOutputTopic("t.upper.case", new Serdes.StringSerde().deserializer(), new Serdes.StringSerde().deserializer());
//When
inputTopic.pipeInput("test");
//Then
assertThat(outputTopic.readValue()).isEqualTo("TEST");
}
}
我想编写一个集成测试,首先启动一个嵌入式 kafka 服务器,然后测试 UppercaseStream 拓扑。
我尝试了以下方法:
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class EmbeddedKafkaIntegrationTest {
@Autowired
public KafkaTemplate<String, String> template;
@Autowired
private KafkaConsumer consumer;
private KafkaStreams kafkaStreams;
@Value("${test.topic}")
private String topic;
@Autowired
private KafkaStreamsConfiguration kafkaStreamsConfiguration;
@Test
public void should_transform_lowercase_to_uppercase() throws Exception {
//Create a StreamsBuilder
StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.stream(topic, Consumed.with(new Serdes.StringSerde(), new Serdes.StringSerde()));
//Add a topology
new UppercaseStream().kStreamPromoToUppercase(streamsBuilder);
kafkaStreams = new KafkaStreams(streamsBuilder.build(), kafkaStreamsConfiguration.asProperties());
kafkaStreams.start();
template.send(topic, "test");
consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
assertThat(consumer.getLatch().getCount(), equalTo(0L));
assertThat(consumer.getPayload(), containsString("TEST"));
}
@After
public void tearDown() {
if (kafkaStreams!= null) kafkaStreams.close();
}
}
测试未通过断言。我不确定如何获取 kStreamPromoToUppercase bean。我不确定我是否在尝试遵循正确的方法。
不清楚你的KafkaConsumer
是什么;大概是 Consumer<K, V>
.
也许您的消费者没有auto.offset.reset=earliest
。这是此类测试的常见错误,因为在发送记录后消费者可能会开始比赛;默认是 latest
所以你不会得到这样的记录。
该框架有许多使用嵌入式 kafka 代理的 Kafka Streams 测试用例。
集成测试中缺少一些东西。
需要一对 NewTopic
kafka 客户端管理对象来表示输入和输出主题
@Bean public NewTopic createInputTopic() { return new NewTopic(inputTopic,Optional.of(1), Optional.empty()); }
另一个是输出主题
@Bean public NewTopic createOutputTopic() { return new NewTopic(outputTopic,Optional.of(1), Optional.empty()); }
测试的其余部分大致相同。正如@Garry 所建议的,我使用了 kafka 消费者。
@SpringBootTest
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class KStreamSampleApplicationTests {
private final KafkaProperties kafkaProperties;
private final String inputTopic;
private final String outputTopic;
@Autowired
public KStreamSampleApplicationTests(KafkaProperties kafkaProperties, Environment env) {
this.kafkaProperties = kafkaProperties;
this.inputTopic = env.getProperty("spring.kafka.input-lowercase-topic");
this.outputTopic = env.getProperty("spring.kafka.output-uppercase-topic");
}
@Test
@DisplayName("should test uppercaseStream topology")
void shouldTestUppercaseStreamTopology() {
//Given
Map<String, Object> producerProps = new HashMap<>(KafkaTestUtils.producerProps(
String.join(",", kafkaProperties.getBootstrapServers())));
//Create a kafka producer
Producer<String, String> producer = new DefaultKafkaProducerFactory<>(producerProps, new StringSerializer(), new StringSerializer()).createProducer();
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(String.join(",", kafkaProperties.getBootstrapServers()), "testGroup", "true");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//Create a Consumer client
Consumer<String, String> consumer = new DefaultKafkaConsumerFactory<>(consumerProps, new StringDeserializer(), new StringDeserializer()).createConsumer();
consumer.subscribe(Collections.singleton(outputTopic));
//When
producer.send(new ProducerRecord<>(inputTopic, "test"));
producer.flush();
//Then
assertThat(producer).isNotNull();
//And
ConsumerRecords<String, String> rec = consumer.poll(Duration.ofSeconds(3));
Iterable<ConsumerRecord<String, String>> records = rec.records(outputTopic);
Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
if (!iterator.hasNext()) Assertions.fail();
ConsumerRecord<String, String> next = iterator.next();
assertThat(next.value()).isEqualTo("TEST");
}
}
这是完整重构解决方案的 gist。