我如何使用 Kafka Binder 对 Kstream 进行单元测试?
How can i unit test a Kstream with an Kafka Binder?
我想对 Kafka Stream Aggregate 进行单元测试,但我完全不知道该使用哪种方法。
我阅读了有关 TestSupportBinder 的信息,但我认为这不适用于我的情况,因此我使用了 KafkaEmbedded 方法。这就是我初始化嵌入式 Kafka 的方式。
@Before
public void setUp() throws Exception{
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group-id", "false", embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
DefaultKafkaConsumerFactory<Object, LoggerMessage> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
consumer = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, OUTPUT_TOPIC);
}
我要测试的是:
public interface Channels {
String LOGGER_IN_STREAM = "logger-topic-in-stream";
String LOGGER_IN = "logger-topic-in";
String LOGGERDATAVALIDATED_OUT = "loggerDataValidated-topic-out";
@Input(Channels.LOGGER_IN)
SubscribableChannel processMessage();
@Input(Channels.LOGGER_IN_STREAM)
KStream<Object, LoggerMessage> loggerKstreamIn();
@Output(Channels.LOGGERDATAVALIDATED_OUT)
MessageChannel validateLoggerData();
}
我收到以下错误消息
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'some.domain.Channels': Invocation of init method failed; nested exception is java.lang.IllegalStateException: No factory found for binding target type: org.apache.kafka.streams.kstream.KStream among registered factories: channelFactory,messageSourceFactory
Caused by: java.lang.IllegalStateException: No factory found for binding target type: org.apache.kafka.streams.kstream.KStream among registered factories: channelFactory,messageSourceFactory
我做错了什么?
我错过了将我的通道接口作为 MockBean 注入。在我这样做之后,一切都按预期工作。
我想对 Kafka Stream Aggregate 进行单元测试,但我完全不知道该使用哪种方法。 我阅读了有关 TestSupportBinder 的信息,但我认为这不适用于我的情况,因此我使用了 KafkaEmbedded 方法。这就是我初始化嵌入式 Kafka 的方式。
@Before
public void setUp() throws Exception{
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group-id", "false", embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
DefaultKafkaConsumerFactory<Object, LoggerMessage> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
consumer = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, OUTPUT_TOPIC);
}
我要测试的是:
public interface Channels {
String LOGGER_IN_STREAM = "logger-topic-in-stream";
String LOGGER_IN = "logger-topic-in";
String LOGGERDATAVALIDATED_OUT = "loggerDataValidated-topic-out";
@Input(Channels.LOGGER_IN)
SubscribableChannel processMessage();
@Input(Channels.LOGGER_IN_STREAM)
KStream<Object, LoggerMessage> loggerKstreamIn();
@Output(Channels.LOGGERDATAVALIDATED_OUT)
MessageChannel validateLoggerData();
}
我收到以下错误消息
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'some.domain.Channels': Invocation of init method failed; nested exception is java.lang.IllegalStateException: No factory found for binding target type: org.apache.kafka.streams.kstream.KStream among registered factories: channelFactory,messageSourceFactory
Caused by: java.lang.IllegalStateException: No factory found for binding target type: org.apache.kafka.streams.kstream.KStream among registered factories: channelFactory,messageSourceFactory
我做错了什么?
我错过了将我的通道接口作为 MockBean 注入。在我这样做之后,一切都按预期工作。