测试 Kafka Streams 拓扑

Test Kafka Streams topology

我正在寻找一种测试 Kafka Streams 应用程序的方法。这样我就可以定义输入事件,测试套件会向我显示输出。

如果没有真正的 Kafka 设置,这可能吗?

您可以只 运行 本地的单个 Zookeeper 和代理来测试 Kafka Streams 应用程序。

只需按照这些快速入门指南操作即可:

另请查看此 Kafka Streams 示例(在 JavaDocs 中有详细的演练说明):

  1. 当您询问是否可以在没有真正的 Kafka 设置的情况下测试 Kafka Streams 应用程序时,您可以在 Scala 中尝试这个 Mocked Streams 库。 Mocked Streams 1.0 是 Scala >= 2.11.8 的库,它允许您 unit-test 处理 Kafka Streams 应用程序的拓扑结构(自 Apache Kafka >=0.10.1 起),无需 Zookeeper 和 Kafka Brokers。 参考:https://github.com/jpzk/mockedstreams

  2. 您还可以使用 scalatest-embedded-kafka 这是一个库,它为 运行 您的 ScalaTest 规范提供 in-memory Kafka 代理。它使用 Kafka 0.10.1.1 和 ZooKeeper 3.4.8.
    参考:https://github.com/manub/scalatest-embedded-kafka#scalatest-embedded-kafka-streams

祝你好运!

您应该检查 Kafka 单元 here

您的测试设置应如下所示:

KafkaUnit kafkaUnitServer = new KafkaUnit();
kafkaUnitServer.startup();
kafkaUnitServer.createTopic(testTopic);
KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(testTopic, "key", "value");
kafkaUnitServer.sendMessages(keyedMessage);

然后要阅读您的消息并断言一切正常,您可以这样做:

List<String> messages = kafkaUnitServer.readMessages(testTopic, 1);

这实际上启动了一个嵌入式 kafka,可帮助您将所需的一切包含在测试中。

您可以更高级一点,将嵌入式 kafka 设置为 setup() 方法(或 Spock 中的 setupSpec()),并在 teardown() 中停止嵌入式 kafka。

你可以使用https://github.com/jpzk/mockedstreams看下面的例子...

import com.madewithtea.mockedstreams.MockedStreams

val input = Seq(("x", "v1"), ("y", "v2"))
val exp = Seq(("x", "V1"), ("y", "V2"))
val strings = Serdes.String()

MockedStreams()
  .topology { builder => builder.stream(...) [...] }
  .input("topic-in", strings, strings, input)
  .output("topic-out", strings, strings, exp.size) shouldEqual exp

希望对您有所帮助...

更新 Kafka 1.1.0(2018 年 3 月 23 日发布):

KIP-247 added official test utils. Per the Upgrade Guide:

There is a new artifact kafka-streams-test-utils providing a TopologyTestDriver, ConsumerRecordFactory, and OutputVerifier class. You can include the new artifact as a regular dependency to your unit tests and use the test driver to test your business logic of your Kafka Streams application. For more details, see KIP-247.

来自documentation:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams-test-utils</artifactId>
        <version>1.1.0</version>
        <scope>test</scope>
    </dependency>

测试驱动程序模拟库运行时不断从输入主题中获取记录并通过遍历拓扑进行处理。您可以使用测试驱动程序来验证您指定的处理器拓扑是否使用手动输入的数据记录计算出正确的结果。测试驱动程序捕获结果记录并允许查询其嵌入式状态存储:

    // Create your topology
    Topology topology = new Topology();
    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");

    // Run it on the test driver
    TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);

    // Feed input data
    ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>("input-topic", new StringSerializer(), new IntegerSerializer());
    testDriver.pipe(factory.create("key", 42L));

    // Verify output
    ProducerRecord<String, Integer> outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new LongDeserializer());

详情见the documentation


ProcessorTopologyTestDriver 从 0.11.0.0 开始可用。它在 kafka-streams 测试工件中可用(在 Maven 中用 <classifier>test</classifier> 指定):

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>0.11.0.0</version>
        <classifier>test</classifier>
        <scope>test</scope>
    </dependency>

您还需要添加 kafka-clients 测试工件:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.11.0.0</version>
        <classifier>test</classifier>
        <scope>test</scope>
    </dependency>

然后就可以使用测试驱动了。根据 Javadoc,首先创建一个 ProcessorTopologyTestDriver:

    StringSerializer strSerializer = new StringSerializer();
    StringDeserializer strDeserializer = new StringDeserializer();
    Properties props = new Properties();
    props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
    props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
    props.setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
    props.setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
    props.setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
    props.setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
    StreamsConfig config = new StreamsConfig(props);
    TopologyBuilder builder = ...
    ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(config, builder);

您可以将输入馈送到拓扑中,就好像您实际上已经写入了输入主题之一:

    driver.process("input-topic", "key1", "value1", strSerializer, strSerializer);

并读取输出主题:

    ProducerRecord<String, String> record1 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
    ProducerRecord<String, String> record2 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
    ProducerRecord<String, String> record3 = driver.readOutput("output-topic-2", strDeserializer, strDeserializer);

然后你可以断言这些结果。

Spring kafka 支持使用嵌入式 kafka 进行单元测试,请参阅 https://docs.spring.io/spring-kafka/docs/2.1.0.RELEASE/reference/html/_reference.html#__embeddedkafka_annotation

此外,kafka 团队正在努力发布流的测试驱动程序 https://issues.apache.org/jira/browse/KAFKA-3625

如果你想测试一个使用 Processor APIKafka Stream 拓扑, may not work properly. So after a few hours of researching in the Javadocs and official docs 提供的代码我想出了一个工作代码来测试你的自定义处理器已使用 JUnit.

实施
public class TopologySpec {

private TopologyTestDriver testDriver;

@Before
public void setup() {
    // Processor API
    Topology topology = new Topology();
    topology.addSource("sourceProcessor", "input-topic");
    // In this case, 'EventProcessor' is a custom processor
    // that I implemented and I want to test
    topology.addProcessor("processor", EventProcessor::new, "sourceProcessor");
    topology.addSink("sinkProcessor", "output-topic", "processor");

    // Setup test driver
    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
    // EventProcessor is a <String,String> processor 
    // so we set those serders
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    testDriver = new TopologyTestDriver(topology, config);
}

@After
public void tearDown() {
    testDriver.close(); // Close processors after finish the tests
}

@Test
public void firstTest() {
    // Simulate a producer that sends the message "value,val" without key
    ConsumerRecordFactory factory =
            new ConsumerRecordFactory(new StringSerializer(), new StringSerializer());

    testDriver.pipeInput(factory.create("input-topic", "value,val"));

    // Simulate a consumer that reads from the output topic 
    // where are supposed to be the messages after being processed
    // by your custom processor
    ProducerRecord<String, String> record1 =
            testDriver.readOutput("output-topic", new StringDeserializer(), new StringDeserializer());

    // Compare the output to ensure that your custom processor
    // is working properly. In this case, my processor consumes
    // the message, concatenates ":::processed" to it, and
    // push it to the output-topic
    OutputVerifier.compareValue(record1, "value,val:::processed");
}
}