Spring Cloud Stream Kafka 消费者测试
Spring Cloud Stream Kafka Consumer Test
我正尝试按照 GitHub a link
中的建议设置测试
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
try {
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("words");
template.sendDefault("foobar");
--> ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer, "output");
log.debug(cr);
}
finally {
pf.destroy();
}
其中 StreamProcessor 设置为
@StreamListener
@SendTo("output")
public KStream<?, WordCount> process(@Input("input") KStream<Object, String> input) {
return input.map((key, value) -> new KeyValue<>(value, new WordCount(value, 10, new Date(), new Date())));
}
--> 由于@Streamprocessor 具有@SendTo("output")
,因此我认为该行从不消耗消息,该消息应该与主题"output" 相关
- 我希望能够测试流处理的消息。
您需要从您的 output
绑定到的实际主题中消费。
你有 spring.cloud.stream.bindings.output.destination
的配置吗?那应该是您需要使用的值。如果您不设置它,默认值将与绑定相同 - 在本例中为 output
。
我正尝试按照 GitHub a link
中的建议设置测试 Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
try {
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("words");
template.sendDefault("foobar");
--> ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer, "output");
log.debug(cr);
}
finally {
pf.destroy();
}
其中 StreamProcessor 设置为
@StreamListener
@SendTo("output")
public KStream<?, WordCount> process(@Input("input") KStream<Object, String> input) {
return input.map((key, value) -> new KeyValue<>(value, new WordCount(value, 10, new Date(), new Date())));
}
--> 由于@Streamprocessor 具有@SendTo("output")
,因此我认为该行从不消耗消息,该消息应该与主题"output" 相关- 我希望能够测试流处理的消息。
您需要从您的 output
绑定到的实际主题中消费。
你有 spring.cloud.stream.bindings.output.destination
的配置吗?那应该是您需要使用的值。如果您不设置它,默认值将与绑定相同 - 在本例中为 output
。