如何限制 flatMap 进程中当前正在处理的事件?
How do I limit the events currently being processed in a flatMap process?
给定以下代码
public static void main(String[] args) {
long start = System.currentTimeMillis();
Flux.<Long>generate(s -> s.next(System.currentTimeMillis() - start))
.flatMap(DemoApp::delayedAction)
.doOnNext(l -> System.out.println(l + " -- " + (System.currentTimeMillis() - start)))
.blockLast(Duration.ofSeconds(3));
}
private static Publisher<? extends Long> delayedAction(Long l) {
return Mono.just(l).delayElement(Duration.ofSeconds(1));
}
从输出中可以看出,delayedAction
中同时存在大量事件"processed"。
在此示例中,在几毫秒内生成了 256 个事件,然后等待大约一秒钟,直到它们再次发出。
我想将这个数字限制为例如10、我该怎么做?
解决方案应该独立于内部发生的事情 delayedAction
背景
延迟操作中真正发生的事情是:我执行 HTTP 请求并开始无限量(或非常大的)请求听起来不是个好主意。
已经有一个方法:Flux.flatMap(Function> mapper, int concurrency)
来自其文档:
The concurrency argument allows to control how many Publisher can be subscribed to and merged in parallel.
给定以下代码
public static void main(String[] args) {
long start = System.currentTimeMillis();
Flux.<Long>generate(s -> s.next(System.currentTimeMillis() - start))
.flatMap(DemoApp::delayedAction)
.doOnNext(l -> System.out.println(l + " -- " + (System.currentTimeMillis() - start)))
.blockLast(Duration.ofSeconds(3));
}
private static Publisher<? extends Long> delayedAction(Long l) {
return Mono.just(l).delayElement(Duration.ofSeconds(1));
}
从输出中可以看出,delayedAction
中同时存在大量事件"processed"。
在此示例中,在几毫秒内生成了 256 个事件,然后等待大约一秒钟,直到它们再次发出。
我想将这个数字限制为例如10、我该怎么做?
解决方案应该独立于内部发生的事情 delayedAction
背景
延迟操作中真正发生的事情是:我执行 HTTP 请求并开始无限量(或非常大的)请求听起来不是个好主意。
已经有一个方法:Flux.flatMap(Function> mapper, int concurrency)
来自其文档:
The concurrency argument allows to control how many Publisher can be subscribed to and merged in parallel.