使用带有 MockSchemaRegistryClient 的 embeddedKafka 使用 Spring 云流 kafka 流创建测试时出现问题
Problem creating tests with Spring cloud streams kafka streams using embeddedKafka with MockSchemaRegistryClient
我想知道如何测试我的 Spring Cloud Streams Kafka-Streams 应用程序。
应用程序看起来像这样:
流 1:主题 1 > 主题 2
流 2:主题 2 + 主题 3 加入 > 主题 4
第 3 组:主题 4 > 主题 5
我尝试了不同的方法,例如 TestChannelBinder,但这种方法仅适用于简单函数,不适用于 Streams 和 Avro。
我决定将 EmbeddedKafka 与 MockSchemaRegistryClient 一起使用。我可以生成一个主题,也可以再次使用同一主题 (topic1),但我无法使用 (topic2)。
在我的测试中 application.yaml 我进行了以下配置(我现在只测试第一个流,我想在它工作后扩展它):
spring.application.name: processingapp
spring.cloud:
function.definition: stream1 # not now ;stream2;stream3
stream:
bindings:
stream1-in-0:
destination: topic1
stream1-out-0:
destination: topic2
kafka:
binder:
min-partition-count: 1
replication-factor: 1
auto-create-topics: true
auto-add-partitions: true
bindings:
default:
consumer:
autoRebalanceEnabled: true
resetOffsets: true
startOffset: earliest
stream1-in-0:
consumer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
stream1-out-0:
producer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
streams:
binder:
configuration:
schema.registry.url: mock://localtest
specivic.avro.reader: true
我的测试如下所示:
@RunWith(SpringRunner.class)
@SpringBootTest
public class Test {
private static final String INPUT_TOPIC = "topic1";
private static final String OUTPUT_TOPIC = "topic2";
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, 1, INPUT_TOPIC, OUTPUT_TOPIC);
@BeforeClass
public static void setup() {
System.setProperty("spring.cloud.stream.kafka.binder.brokers", embeddedKafka.getEmbeddedKafka().getBrokersAsString());
}
@org.junit.Test
public void testSendReceive() throws IOException {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
senderProps.put("key.serializer", LongSerializer.class);
senderProps.put("value.serializer", SpecificAvroSerializer.class);
senderProps.put("schema.registry.url", "mock://localtest");
AvroFileParser fileParser = new AvroFileParser();
DefaultKafkaProducerFactory<Long, Test1> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Long, Test1> template = new KafkaTemplate<>(pf, true);
Test1 test1 = fileParser.parseTest1("src/test/resources/mocks/test1.json");
template.send(INPUT_TOPIC, 123456L, test1);
System.out.println("produced");
Map<String, Object> consumer1Props = KafkaTestUtils.consumerProps("testConsumer1", "false", embeddedKafka.getEmbeddedKafka());
consumer1Props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumer1Props.put("key.deserializer", LongDeserializer.class);
consumer1Props.put("value.deserializer", SpecificAvroDeserializer.class);
consumer1Props.put("schema.registry.url", "mock://localtest");
DefaultKafkaConsumerFactory<Long, Test1> cf = new DefaultKafkaConsumerFactory<>(consumer1Props);
Consumer<Long, Test1> consumer1 = cf.createConsumer();
consumer1.subscribe(Collections.singleton(INPUT_TOPIC));
ConsumerRecords<Long, Test1> records = consumer1.poll(Duration.ofSeconds(10));
consumer1.commitSync();
System.out.println("records count?");
System.out.println("" + records.count());
Test1 fetchedTest1;
fetchedTest1 = records.iterator().next().value();
assertThat(records.count()).isEqualTo(1);
System.out.println("found record");
System.out.println(fetchedTest1.toString());
Map<String, Object> consumer2Props = KafkaTestUtils.consumerProps("testConsumer2", "false", embeddedKafka.getEmbeddedKafka());
consumer2Props.put("key.deserializer", StringDeserializer.class);
consumer2Props.put("value.deserializer", TestAvroDeserializer.class);
consumer2Props.put("schema.registry.url", "mock://localtest");
DefaultKafkaConsumerFactory<String, Test2> consumer2Factory = new DefaultKafkaConsumerFactory<>(consumer2Props);
Consumer<String, Test2> consumer2 = consumer2Factory.createConsumer();
consumer2.subscribe(Collections.singleton(OUTPUT_TOPIC));
ConsumerRecords<String, Test2> records2 = consumer2.poll(Duration.ofSeconds(30));
consumer2.commitSync();
if (records2.iterator().hasNext()) {
System.out.println("has next");
} else {
System.out.println("has no next");
}
}
}
我在尝试从主题 2 使用和反序列化时收到以下异常:
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro unknown schema for id 0
Caused by: java.io.IOException: Cannot get schema from schema registry!
at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaBySubjectAndIdFromRegistry(MockSchemaRegistryClient.java:193) ~[kafka-schema-registry-client-6.2.0.jar:na]
at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaBySubjectAndId(MockSchemaRegistryClient.java:249) ~[kafka-schema-registry-client-6.2.0.jar:na]
at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaById(MockSchemaRegistryClient.java:232) ~[kafka-schema-registry-client-6.2.0.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:307) ~[kafka-avro-serializer-6.2.0.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:107) ~[kafka-avro-serializer-6.2.0.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:86) ~[kafka-avro-serializer-6.2.0.jar:na]
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55) ~[kafka-avro-serializer-6.2.0.jar:na]
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) ~[kafka-clients-2.7.1.jar:na]
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeKey(SourceNode.java:54) ~[kafka-streams-2.7.1.jar:na]
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:65) ~[kafka-streams-2.7.1.jar:na]
at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176) ~[kafka-streams-2.7.1.jar:na]
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112) ~[kafka-streams-2.7.1.jar:na]
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:185) ~[kafka-streams-2.7.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:895) ~[kafka-streams-2.7.1.jar:na]
at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1008) ~[kafka-streams-2.7.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:812) ~[kafka-streams-2.7.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625) ~[kafka-streams-2.7.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:564) ~[kafka-streams-2.7.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:523) ~[kafka-streams-2.7.1.jar:na]
不会有消息被消费。
所以我尝试覆盖 SpecificAvroSerde 并直接注册模式并使用这个反序列化器。
public class TestAvroDeserializer<T extends org.apache.avro.specific.SpecificRecord>
extends SpecificAvroDeserializer<T> implements Deserializer<T> {
private final KafkaAvroDeserializer inner;
public TestAvroDeserializer() throws IOException, RestClientException {
MockSchemaRegistryClient mockedClient = new MockSchemaRegistryClient();
Schema.Parser parser = new Schema.Parser();
Schema test2Schema = parser.parse(new File("./src/main/resources/avro/test2.avsc"));
mockedClient.register("test2-value", test2Schema , 1, 0);
inner = new KafkaAvroDeserializer(mockedClient);
}
/**
* For testing purposes only.
*/
TestAvroDeserializer(final SchemaRegistryClient client) throws IOException, RestClientException {
MockSchemaRegistryClient mockedClient = new MockSchemaRegistryClient();
Schema.Parser parser = new Schema.Parser();
Schema test2Schema = parser.parse(new File("./src/main/resources/avro/test2.avsc"));
mockedClient.register("test2-value", test2Schema , 1, 0);
inner = new KafkaAvroDeserializer(mockedClient);
}
}
使用这个反序列化器也行不通。有没有人有关于如何使用 EmbeddedKafka 和 MockSchemaRegistry 进行此测试的经验?或者我应该使用另一种方法吗?
如果有人能提供帮助,我将非常高兴。提前谢谢你。
我找到了一种合适的方法来集成测试我的拓扑。
我使用 kafka-streams-test-utils 包中的 TopologyTestDriver。
将此依赖项包含到 Maven 中:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<scope>test</scope>
</dependency>
对于问题中描述的应用程序,设置 TopologyTestDriver 如下所示。此代码只是按顺序显示其工作原理。
@Test
void test() {
keySerde.configure(Map.of(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://schemas"), true);
valueSerdeTopic1.configure(Map.of(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://schemas"), false);
valueSerdeTopic2.configure(Map.of(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://schemas"), false);
final StreamsBuilder builder = new StreamsBuilder();
Configuration config = new Configuration(); // class where you declare your spring cloud stream functions
KStream<String, Topic1> input = builder.stream("topic1", Consumed.with(keySerde, valueSerdeTopic1));
KStream<String, Topic2> output = config.stream1().apply(input);
output.to("topic2");
Topology topology = builder.build();
Properties streamsConfig = new Properties();
streamsConfig.putAll(Map.of(
org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG, "toplogy-test-driver",
org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "ignored",
KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://schemas",
org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, PrimitiveAvroSerde.class.getName(),
org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class.getName()
));
TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamsConfig);
TestInputTopic<String, Topic1> inputTopic = testDriver.createInputTopic("topic1", keySerde.serializer(), valueSerdeTopic1.serializer());
TestOutputTopic<String, Topic2> outputTopic = testDriver.createOutputTopic("topic2", keySerde.deserializer(), valueSerdeTopic2.deserializer());
inputTopic.pipeInput("key", topic1AvroModel); // Write to the input topic which applies the topology processor of your spring-cloud-stream app
KeyValue<String, Topic2> outputRecord = outputTopic.readKeyValue(); // Read from the output topic
}
如果您编写更多测试,我建议您将设置代码抽象化,以免为每个测试重复自己。
我强烈推荐 spring-cloud-streams-samples 存储库中的 this 示例,它引导我找到使用 TopologyTestDriver 的解决方案。
我想知道如何测试我的 Spring Cloud Streams Kafka-Streams 应用程序。
应用程序看起来像这样:
流 1:主题 1 > 主题 2
流 2:主题 2 + 主题 3 加入 > 主题 4
第 3 组:主题 4 > 主题 5
我尝试了不同的方法,例如 TestChannelBinder,但这种方法仅适用于简单函数,不适用于 Streams 和 Avro。
我决定将 EmbeddedKafka 与 MockSchemaRegistryClient 一起使用。我可以生成一个主题,也可以再次使用同一主题 (topic1),但我无法使用 (topic2)。
在我的测试中 application.yaml 我进行了以下配置(我现在只测试第一个流,我想在它工作后扩展它):
spring.application.name: processingapp
spring.cloud:
function.definition: stream1 # not now ;stream2;stream3
stream:
bindings:
stream1-in-0:
destination: topic1
stream1-out-0:
destination: topic2
kafka:
binder:
min-partition-count: 1
replication-factor: 1
auto-create-topics: true
auto-add-partitions: true
bindings:
default:
consumer:
autoRebalanceEnabled: true
resetOffsets: true
startOffset: earliest
stream1-in-0:
consumer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
stream1-out-0:
producer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
streams:
binder:
configuration:
schema.registry.url: mock://localtest
specivic.avro.reader: true
我的测试如下所示:
@RunWith(SpringRunner.class)
@SpringBootTest
public class Test {
private static final String INPUT_TOPIC = "topic1";
private static final String OUTPUT_TOPIC = "topic2";
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, 1, INPUT_TOPIC, OUTPUT_TOPIC);
@BeforeClass
public static void setup() {
System.setProperty("spring.cloud.stream.kafka.binder.brokers", embeddedKafka.getEmbeddedKafka().getBrokersAsString());
}
@org.junit.Test
public void testSendReceive() throws IOException {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
senderProps.put("key.serializer", LongSerializer.class);
senderProps.put("value.serializer", SpecificAvroSerializer.class);
senderProps.put("schema.registry.url", "mock://localtest");
AvroFileParser fileParser = new AvroFileParser();
DefaultKafkaProducerFactory<Long, Test1> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Long, Test1> template = new KafkaTemplate<>(pf, true);
Test1 test1 = fileParser.parseTest1("src/test/resources/mocks/test1.json");
template.send(INPUT_TOPIC, 123456L, test1);
System.out.println("produced");
Map<String, Object> consumer1Props = KafkaTestUtils.consumerProps("testConsumer1", "false", embeddedKafka.getEmbeddedKafka());
consumer1Props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumer1Props.put("key.deserializer", LongDeserializer.class);
consumer1Props.put("value.deserializer", SpecificAvroDeserializer.class);
consumer1Props.put("schema.registry.url", "mock://localtest");
DefaultKafkaConsumerFactory<Long, Test1> cf = new DefaultKafkaConsumerFactory<>(consumer1Props);
Consumer<Long, Test1> consumer1 = cf.createConsumer();
consumer1.subscribe(Collections.singleton(INPUT_TOPIC));
ConsumerRecords<Long, Test1> records = consumer1.poll(Duration.ofSeconds(10));
consumer1.commitSync();
System.out.println("records count?");
System.out.println("" + records.count());
Test1 fetchedTest1;
fetchedTest1 = records.iterator().next().value();
assertThat(records.count()).isEqualTo(1);
System.out.println("found record");
System.out.println(fetchedTest1.toString());
Map<String, Object> consumer2Props = KafkaTestUtils.consumerProps("testConsumer2", "false", embeddedKafka.getEmbeddedKafka());
consumer2Props.put("key.deserializer", StringDeserializer.class);
consumer2Props.put("value.deserializer", TestAvroDeserializer.class);
consumer2Props.put("schema.registry.url", "mock://localtest");
DefaultKafkaConsumerFactory<String, Test2> consumer2Factory = new DefaultKafkaConsumerFactory<>(consumer2Props);
Consumer<String, Test2> consumer2 = consumer2Factory.createConsumer();
consumer2.subscribe(Collections.singleton(OUTPUT_TOPIC));
ConsumerRecords<String, Test2> records2 = consumer2.poll(Duration.ofSeconds(30));
consumer2.commitSync();
if (records2.iterator().hasNext()) {
System.out.println("has next");
} else {
System.out.println("has no next");
}
}
}
我在尝试从主题 2 使用和反序列化时收到以下异常:
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro unknown schema for id 0
Caused by: java.io.IOException: Cannot get schema from schema registry!
at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaBySubjectAndIdFromRegistry(MockSchemaRegistryClient.java:193) ~[kafka-schema-registry-client-6.2.0.jar:na]
at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaBySubjectAndId(MockSchemaRegistryClient.java:249) ~[kafka-schema-registry-client-6.2.0.jar:na]
at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaById(MockSchemaRegistryClient.java:232) ~[kafka-schema-registry-client-6.2.0.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:307) ~[kafka-avro-serializer-6.2.0.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:107) ~[kafka-avro-serializer-6.2.0.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:86) ~[kafka-avro-serializer-6.2.0.jar:na]
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55) ~[kafka-avro-serializer-6.2.0.jar:na]
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) ~[kafka-clients-2.7.1.jar:na]
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeKey(SourceNode.java:54) ~[kafka-streams-2.7.1.jar:na]
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:65) ~[kafka-streams-2.7.1.jar:na]
at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176) ~[kafka-streams-2.7.1.jar:na]
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112) ~[kafka-streams-2.7.1.jar:na]
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:185) ~[kafka-streams-2.7.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:895) ~[kafka-streams-2.7.1.jar:na]
at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1008) ~[kafka-streams-2.7.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:812) ~[kafka-streams-2.7.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625) ~[kafka-streams-2.7.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:564) ~[kafka-streams-2.7.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:523) ~[kafka-streams-2.7.1.jar:na]
不会有消息被消费。
所以我尝试覆盖 SpecificAvroSerde 并直接注册模式并使用这个反序列化器。
public class TestAvroDeserializer<T extends org.apache.avro.specific.SpecificRecord>
extends SpecificAvroDeserializer<T> implements Deserializer<T> {
private final KafkaAvroDeserializer inner;
public TestAvroDeserializer() throws IOException, RestClientException {
MockSchemaRegistryClient mockedClient = new MockSchemaRegistryClient();
Schema.Parser parser = new Schema.Parser();
Schema test2Schema = parser.parse(new File("./src/main/resources/avro/test2.avsc"));
mockedClient.register("test2-value", test2Schema , 1, 0);
inner = new KafkaAvroDeserializer(mockedClient);
}
/**
* For testing purposes only.
*/
TestAvroDeserializer(final SchemaRegistryClient client) throws IOException, RestClientException {
MockSchemaRegistryClient mockedClient = new MockSchemaRegistryClient();
Schema.Parser parser = new Schema.Parser();
Schema test2Schema = parser.parse(new File("./src/main/resources/avro/test2.avsc"));
mockedClient.register("test2-value", test2Schema , 1, 0);
inner = new KafkaAvroDeserializer(mockedClient);
}
}
使用这个反序列化器也行不通。有没有人有关于如何使用 EmbeddedKafka 和 MockSchemaRegistry 进行此测试的经验?或者我应该使用另一种方法吗?
如果有人能提供帮助,我将非常高兴。提前谢谢你。
我找到了一种合适的方法来集成测试我的拓扑。
我使用 kafka-streams-test-utils 包中的 TopologyTestDriver。
将此依赖项包含到 Maven 中:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<scope>test</scope>
</dependency>
对于问题中描述的应用程序,设置 TopologyTestDriver 如下所示。此代码只是按顺序显示其工作原理。
@Test
void test() {
keySerde.configure(Map.of(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://schemas"), true);
valueSerdeTopic1.configure(Map.of(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://schemas"), false);
valueSerdeTopic2.configure(Map.of(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://schemas"), false);
final StreamsBuilder builder = new StreamsBuilder();
Configuration config = new Configuration(); // class where you declare your spring cloud stream functions
KStream<String, Topic1> input = builder.stream("topic1", Consumed.with(keySerde, valueSerdeTopic1));
KStream<String, Topic2> output = config.stream1().apply(input);
output.to("topic2");
Topology topology = builder.build();
Properties streamsConfig = new Properties();
streamsConfig.putAll(Map.of(
org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG, "toplogy-test-driver",
org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "ignored",
KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://schemas",
org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, PrimitiveAvroSerde.class.getName(),
org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class.getName()
));
TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamsConfig);
TestInputTopic<String, Topic1> inputTopic = testDriver.createInputTopic("topic1", keySerde.serializer(), valueSerdeTopic1.serializer());
TestOutputTopic<String, Topic2> outputTopic = testDriver.createOutputTopic("topic2", keySerde.deserializer(), valueSerdeTopic2.deserializer());
inputTopic.pipeInput("key", topic1AvroModel); // Write to the input topic which applies the topology processor of your spring-cloud-stream app
KeyValue<String, Topic2> outputRecord = outputTopic.readKeyValue(); // Read from the output topic
}
如果您编写更多测试,我建议您将设置代码抽象化,以免为每个测试重复自己。 我强烈推荐 spring-cloud-streams-samples 存储库中的 this 示例,它引导我找到使用 TopologyTestDriver 的解决方案。