限制 Reactor Flux 读取 Mongodb 集合的吞吐量

Limit the throughput of a Reactor Flux reading a Mongodb collection

我正在使用 Spring 5,详细介绍 Reactor 项目,从巨大的 Mongo 集合中读取信息到 Kafka 主题。不幸的是,Kafka 消息的产生比消费它们的程序要快得多。所以,我需要实现一些 backpressure 机制。

假设我希望每秒处理 100 条消息。谷歌搜索了一下,我决定将 buffer(int maxSize) method, zipping 结果的特性与 Flux 结合使用预定义的时间间隔发出消息。

 // Create a clock that emits an event every second
 final Flux<Long> clock = Flux.interval(Duration.ofMillis(1000L));
 // Create a buffered producer
 final Flux<ProducerRecord<String, Data>> outbound =
            repository.findAll()
                      .map(this::buildData)
                      .map(this::createKafkaMessage)
                      .buffer(100)
                      // Limiting the emission in time interval
                      .zipWith(clock, (msgs, tick) -> msgs)
                      .flatMap(Flux::fromIterable);
 // Subscribe a Kafka sender
 kafkaSender.createOutbound()
            .send(outbound)
            .then()
            .block();

有没有更聪明的方法来做到这一点?我的意思是,在我看来它有点复杂(拉链部分,总体而言)。

是的,您可以直接使用 delayElements(Duration.ofSeconds(1)) 操作而无需压缩它。 Reactor Cool 项目总是有改进,因为它会不断升级,所以让我们保持粘性 :) 希望对您有所帮助!