限制 Reactor Flux 读取 Mongodb 集合的吞吐量
Limit the throughput of a Reactor Flux reading a Mongodb collection
我正在使用 Spring 5,详细介绍 Reactor 项目,从巨大的 Mongo 集合中读取信息到 Kafka 主题。不幸的是,Kafka 消息的产生比消费它们的程序要快得多。所以,我需要实现一些 backpressure 机制。
假设我希望每秒处理 100 条消息。谷歌搜索了一下,我决定将 buffer(int maxSize)
method, zipping 结果的特性与 Flux
结合使用预定义的时间间隔发出消息。
// Create a clock that emits an event every second
final Flux<Long> clock = Flux.interval(Duration.ofMillis(1000L));
// Create a buffered producer
final Flux<ProducerRecord<String, Data>> outbound =
repository.findAll()
.map(this::buildData)
.map(this::createKafkaMessage)
.buffer(100)
// Limiting the emission in time interval
.zipWith(clock, (msgs, tick) -> msgs)
.flatMap(Flux::fromIterable);
// Subscribe a Kafka sender
kafkaSender.createOutbound()
.send(outbound)
.then()
.block();
有没有更聪明的方法来做到这一点?我的意思是,在我看来它有点复杂(拉链部分,总体而言)。
是的,您可以直接使用 delayElements(Duration.ofSeconds(1))
操作而无需压缩它。 Reactor Cool 项目总是有改进,因为它会不断升级,所以让我们保持粘性 :) 希望对您有所帮助!
我正在使用 Spring 5,详细介绍 Reactor 项目,从巨大的 Mongo 集合中读取信息到 Kafka 主题。不幸的是,Kafka 消息的产生比消费它们的程序要快得多。所以,我需要实现一些 backpressure 机制。
假设我希望每秒处理 100 条消息。谷歌搜索了一下,我决定将 buffer(int maxSize)
method, zipping 结果的特性与 Flux
结合使用预定义的时间间隔发出消息。
// Create a clock that emits an event every second
final Flux<Long> clock = Flux.interval(Duration.ofMillis(1000L));
// Create a buffered producer
final Flux<ProducerRecord<String, Data>> outbound =
repository.findAll()
.map(this::buildData)
.map(this::createKafkaMessage)
.buffer(100)
// Limiting the emission in time interval
.zipWith(clock, (msgs, tick) -> msgs)
.flatMap(Flux::fromIterable);
// Subscribe a Kafka sender
kafkaSender.createOutbound()
.send(outbound)
.then()
.block();
有没有更聪明的方法来做到这一点?我的意思是,在我看来它有点复杂(拉链部分,总体而言)。
是的,您可以直接使用 delayElements(Duration.ofSeconds(1))
操作而无需压缩它。 Reactor Cool 项目总是有改进,因为它会不断升级,所以让我们保持粘性 :) 希望对您有所帮助!