Project Reactor:如何控制 Flux 排放

Project Reactor: How to control Flux emission

我有一个能发出一些 Date 的通量。此 Date 映射到 1024 个模拟 HTTP 请求,我在某些 Executer.

上 运行

我想做的是在发出下一个 Date 之前等待所有 1024 个 HTTP 请求。

目前运行时,onNext()被多次调用,然后稳定在某个稳定的速率上。

我怎样才能改变这种行为?

P.S。如果需要,我愿意改变架构。

private void run() throws Exception {

    Executor executor = Executors.newFixedThreadPool(2);

    Flux<Date> source = Flux.generate(emitter ->
        emitter.next(new Date())
    );

    source
            .log()
            .limitRate(1)
            .doOnNext(date -> System.out.println("on next: " + date))
            .map(date -> Flux.range(0, 1024))
            .flatMap(i -> Mono.fromCallable(Pipeline::simulateHttp)
                    .subscribeOn(Schedulers.fromExecutor(executor)))
            .subscribe(s -> System.out.println(s));

    Thread.currentThread().join();
}

模拟HTTP请求:

private static String simulateHttp() {
    try {
        System.out.println("start http call");
        Thread.sleep(3_000);
    } catch (Exception e) {}

    return "HTML content";
}

编辑:改编自答案的代码:

你应该看看这些方法:

concatMap 确保 flux 上的元素在运算符内部顺序处理:

Generation of inners and subscription: this operator waits for one inner to complete before generating the next one and subscribing to it.

flatMap 允许您通过公开 concurrencyprefetch 参数来执行相同的操作,这些参数使您可以更好地控制此行为:

The concurrency argument allows to control how many Publisher can be subscribed to and merged in parallel. In turn, that argument shows the size of the first Subscription.request(long) to the upstream. The prefetch argument allows to give an arbitrary prefetch size to the merged Publisher (in other words prefetch size means the size of the first Subscription.request(long) to the merged Publisher).