使用带有 MockSchemaRegistryClient 的 embeddedKafka 使用 Spring 云流 kafka 流创建测试时出现问题

Problem creating tests with Spring cloud streams kafka streams using embeddedKafka with MockSchemaRegistryClient

我想知道如何测试我的 Spring Cloud Streams Kafka-Streams 应用程序。

应用程序看起来像这样:

流 1:主题 1 > 主题 2
流 2:主题 2 + 主题 3 加入 > 主题 4
第 3 组:主题 4 > 主题 5

我尝试了不同的方法,例如 TestChannelBinder,但这种方法仅适用于简单函数,不适用于 Streams 和 Avro。

我决定将 EmbeddedKafka 与 MockSchemaRegistryClient 一起使用。我可以生成一个主题,也可以再次使用同一主题 (topic1),但我无法使用 (topic2)。

在我的测试中 application.yaml 我进行了以下配置(我现在只测试第一个流,我想在它工作后扩展它):

spring.application.name: processingapp
spring.cloud:
  function.definition: stream1 # not now ;stream2;stream3
    stream:
      bindings:
        stream1-in-0:
          destination: topic1
        stream1-out-0:
          destination: topic2
      kafka:
        binder:
          min-partition-count: 1
          replication-factor: 1
          auto-create-topics: true
          auto-add-partitions: true
        bindings:
          default:
            consumer:
              autoRebalanceEnabled: true
              resetOffsets: true
              startOffset: earliest
        stream1-in-0:
          consumer:
            keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
            valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
          stream1-out-0:
            producer:
              keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
              valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
        streams:
          binder:
            configuration:
              schema.registry.url: mock://localtest
              specivic.avro.reader: true

我的测试如下所示:

@RunWith(SpringRunner.class)
@SpringBootTest
public class Test {

    private static final String INPUT_TOPIC = "topic1";

    private static final String OUTPUT_TOPIC = "topic2";

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, 1, INPUT_TOPIC, OUTPUT_TOPIC);

    @BeforeClass
    public static void setup() {
        System.setProperty("spring.cloud.stream.kafka.binder.brokers", embeddedKafka.getEmbeddedKafka().getBrokersAsString());
    }

    @org.junit.Test
    public void testSendReceive() throws IOException {
        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
        senderProps.put("key.serializer", LongSerializer.class);
        senderProps.put("value.serializer", SpecificAvroSerializer.class);
        senderProps.put("schema.registry.url", "mock://localtest");
        AvroFileParser fileParser = new AvroFileParser();
        DefaultKafkaProducerFactory<Long, Test1> pf = new DefaultKafkaProducerFactory<>(senderProps);
        KafkaTemplate<Long, Test1> template = new KafkaTemplate<>(pf, true);
        Test1 test1 = fileParser.parseTest1("src/test/resources/mocks/test1.json");

        template.send(INPUT_TOPIC, 123456L, test1);
        System.out.println("produced");
        
        Map<String, Object> consumer1Props = KafkaTestUtils.consumerProps("testConsumer1", "false", embeddedKafka.getEmbeddedKafka());
        consumer1Props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumer1Props.put("key.deserializer", LongDeserializer.class);
        consumer1Props.put("value.deserializer", SpecificAvroDeserializer.class);
        consumer1Props.put("schema.registry.url", "mock://localtest");
        DefaultKafkaConsumerFactory<Long, Test1> cf = new DefaultKafkaConsumerFactory<>(consumer1Props);

        Consumer<Long, Test1> consumer1 = cf.createConsumer();
        consumer1.subscribe(Collections.singleton(INPUT_TOPIC));
        ConsumerRecords<Long, Test1> records = consumer1.poll(Duration.ofSeconds(10));
        consumer1.commitSync();

        System.out.println("records count?");
        System.out.println("" + records.count());

        Test1 fetchedTest1;
        fetchedTest1 = records.iterator().next().value();
        assertThat(records.count()).isEqualTo(1);
        System.out.println("found record");
        System.out.println(fetchedTest1.toString());

        Map<String, Object> consumer2Props = KafkaTestUtils.consumerProps("testConsumer2", "false", embeddedKafka.getEmbeddedKafka());
        consumer2Props.put("key.deserializer", StringDeserializer.class);
        consumer2Props.put("value.deserializer", TestAvroDeserializer.class);
        consumer2Props.put("schema.registry.url", "mock://localtest");

        DefaultKafkaConsumerFactory<String, Test2> consumer2Factory = new DefaultKafkaConsumerFactory<>(consumer2Props);
        Consumer<String, Test2> consumer2 = consumer2Factory.createConsumer();
        consumer2.subscribe(Collections.singleton(OUTPUT_TOPIC));
        ConsumerRecords<String, Test2> records2 = consumer2.poll(Duration.ofSeconds(30));
        consumer2.commitSync();
        

        if (records2.iterator().hasNext()) {
            System.out.println("has next");
        } else {
            System.out.println("has no next");
        }
    }
}

我在尝试从主题 2 使用和反序列化时收到以下异常:

Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro unknown schema for id 0
Caused by: java.io.IOException: Cannot get schema from schema registry!
    at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaBySubjectAndIdFromRegistry(MockSchemaRegistryClient.java:193) ~[kafka-schema-registry-client-6.2.0.jar:na]
    at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaBySubjectAndId(MockSchemaRegistryClient.java:249) ~[kafka-schema-registry-client-6.2.0.jar:na]
    at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaById(MockSchemaRegistryClient.java:232) ~[kafka-schema-registry-client-6.2.0.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:307) ~[kafka-avro-serializer-6.2.0.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:107) ~[kafka-avro-serializer-6.2.0.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:86) ~[kafka-avro-serializer-6.2.0.jar:na]
    at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55) ~[kafka-avro-serializer-6.2.0.jar:na]
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) ~[kafka-clients-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeKey(SourceNode.java:54) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:65) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:185) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:895) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1008) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:812) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:564) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:523) ~[kafka-streams-2.7.1.jar:na]

不会有消息被消费。

所以我尝试覆盖 SpecificAvroSerde 并直接注册模式并使用这个反序列化器。

public class TestAvroDeserializer<T extends org.apache.avro.specific.SpecificRecord>
        extends SpecificAvroDeserializer<T> implements Deserializer<T> {

    private final KafkaAvroDeserializer inner;

    public TestAvroDeserializer() throws IOException, RestClientException {
        MockSchemaRegistryClient mockedClient = new MockSchemaRegistryClient();

        Schema.Parser parser = new Schema.Parser();
        Schema test2Schema = parser.parse(new File("./src/main/resources/avro/test2.avsc"));
        mockedClient.register("test2-value", test2Schema , 1, 0);
        inner = new KafkaAvroDeserializer(mockedClient);
    }

    /**
     * For testing purposes only.
     */
    TestAvroDeserializer(final SchemaRegistryClient client) throws IOException, RestClientException {
        MockSchemaRegistryClient mockedClient = new MockSchemaRegistryClient();

        Schema.Parser parser = new Schema.Parser();
        Schema test2Schema = parser.parse(new File("./src/main/resources/avro/test2.avsc"));
        mockedClient.register("test2-value", test2Schema , 1, 0);

        inner = new KafkaAvroDeserializer(mockedClient);
    }
}

使用这个反序列化器也行不通。有没有人有关于如何使用 EmbeddedKafka 和 MockSchemaRegistry 进行此测试的经验?或者我应该使用另一种方法吗?

如果有人能提供帮助,我将非常高兴。提前谢谢你。

我找到了一种合适的方法来集成测试我的拓扑。

我使用 kafka-streams-test-utils 包中的 TopologyTestDriver。

将此依赖项包含到 Maven 中:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams-test-utils</artifactId>
    <scope>test</scope>
</dependency>

对于问题中描述的应用程序,设置 TopologyTestDriver 如下所示。此代码只是按顺序显示其工作原理。

    @Test
    void test() {
        keySerde.configure(Map.of(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://schemas"), true);
        valueSerdeTopic1.configure(Map.of(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://schemas"), false);
        valueSerdeTopic2.configure(Map.of(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://schemas"), false);

        final StreamsBuilder builder = new StreamsBuilder();

        Configuration config = new Configuration(); // class where you declare your spring cloud stream functions
        KStream<String, Topic1> input = builder.stream("topic1", Consumed.with(keySerde, valueSerdeTopic1));

        KStream<String, Topic2> output = config.stream1().apply(input);
        output.to("topic2");

        Topology topology = builder.build();
        Properties streamsConfig = new Properties();

        streamsConfig.putAll(Map.of(
                org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG, "toplogy-test-driver",
                org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "ignored",
                KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://schemas",
                org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, PrimitiveAvroSerde.class.getName(),
                org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class.getName()
        ));

        TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamsConfig);
        
        TestInputTopic<String, Topic1> inputTopic = testDriver.createInputTopic("topic1", keySerde.serializer(), valueSerdeTopic1.serializer());
        TestOutputTopic<String, Topic2> outputTopic = testDriver.createOutputTopic("topic2", keySerde.deserializer(), valueSerdeTopic2.deserializer());
        inputTopic.pipeInput("key", topic1AvroModel); // Write to the input topic which applies the topology processor of your spring-cloud-stream app
        KeyValue<String, Topic2> outputRecord = outputTopic.readKeyValue(); // Read from the output topic
    }

如果您编写更多测试,我建议您将设置代码抽象化,以免为每个测试重复自己。 我强烈推荐 spring-cloud-streams-samples 存储库中的 this 示例,它引导我找到使用 TopologyTestDriver 的解决方案。