通过 Project Reactor 完成所有任务,但不能同时完成 K 个任务

Complete all tasks, but no more K tasks at the same time via Project Reactor

我是 Project Reactor 的初学者,我认为这很简单,但我找不到解决方案。 我有 N 昂贵的任务要做,我想在 Java 中实现类似 Bounded Semaphore 的东西(不要请求下一个元素,直到 运行 任务的当前计数小于 K). 很快:完成所有任务,但不再同时 K 个任务

Flux.range(1, 100)
    .parallel()
    .limit(K) // Something like this
    .doOnNext(i -> expensiveWork(i))
    .subscribe()

在 SO 上找到 ,但不适合 Reactor。但意思是一样的。请帮忙

接近我的真实情况:

httpClient.getMainPageAsMono()
    .flatMapMany(html -> {
        Flux.fromIterable(getLinksFromPage(it));
    })
    .parallel(k)
    .runOn(Schedulers.boundedElastic())
    .flatMap(link -> {
        // ON THIS PART IT EXECUTES ALL LINKS AT THE SAME TIME
        // INSTEAD OF MAKING THROATTLE
        client.getAnotherPageByLink(link);
    })
    .....
    .subscribe()

即如果getLinksFromPage(it)函数returns1000links,每个nextlink都不会取到client.getAnotherPageByLink(link)还没完吗

这个解决方案怎么样?我从 Flux 中删除了 parallel,以便缓冲 10 个元素。然后可以并行处理每个元素

public static final void main(String... args) {

     Flux.range(1, 1000)
            .buffer(10)
            .doOnNext(grp -> grp.parallelStream().forEach(p -> System.out.println(Instant.now() + " : " + p)))
            .doOnNext(grp -> sleep(1000)) // Wait for 1 second to see how the algorithm is working
            .doOnNext(grp -> System.out.println("####")) 
            .subscribe();
}

private static void sleep(int millis) {
    try {
        Thread.sleep(millis);
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

输出为:

2021-06-12T14:16:23.760298200Z : 8
2021-06-12T14:16:23.760298200Z : 4
2021-06-12T14:16:23.760298200Z : 10
2021-06-12T14:16:23.760298200Z : 1
2021-06-12T14:16:23.760298200Z : 3
2021-06-12T14:16:23.760298200Z : 5
2021-06-12T14:16:23.760298200Z : 7
2021-06-12T14:16:23.760298200Z : 2
2021-06-12T14:16:23.760298200Z : 6
2021-06-12T14:16:23.760298200Z : 9
####
2021-06-12T14:16:24.784628Z : 17
2021-06-12T14:16:24.784628Z : 16
2021-06-12T14:16:24.784628Z : 20
2021-06-12T14:16:24.784628Z : 14
2021-06-12T14:16:24.784628Z : 11
2021-06-12T14:16:24.784628Z : 13
2021-06-12T14:16:24.784628Z : 18
2021-06-12T14:16:24.784628Z : 19
2021-06-12T14:16:24.784628Z : 12
2021-06-12T14:16:24.785801500Z : 15

如你所见,每10个元素在每秒内按组并行处理

仅使用 .parallel() 会给你一个 ParallelFlux,但为了告诉结果 ParallelFlux 运行 每个轨道(并且,通过扩展, 运行 rails 并行)你必须使用 .runOn(Scheduler scheduler).

所以我们应该使用 .parallel(int parallelism).runOn(Scheduler scheduler):

public static void main(String[] args) throws InterruptedException {
    int k = 3;        

    Flux.range(1, 100)
            .parallel(k)  // k rails
            .runOn(Schedulers.boundedElastic()) // the rails will run on this scheduler
            .doOnNext(i -> expensiveWork(i))
            .subscribe();

    Thread.currentThread().join(); // Just so program won't finish
}

private static void expensiveWork(Integer i) {
    Instant start = Instant.now();
    while (Duration.between(start, Instant.now()).getSeconds() < 5) ;
    System.out.println(Instant.now()+" - "+i+" - Done expensive work");
}

输出:

2021-06-12T13:46:58.445Z - 3 - Done expensive work
2021-06-12T13:46:58.445Z - 1 - Done expensive work
2021-06-12T13:46:58.445Z - 2 - Done expensive work
2021-06-12T13:47:03.453Z - 5 - Done expensive work
2021-06-12T13:47:03.453Z - 6 - Done expensive work
2021-06-12T13:47:03.453Z - 4 - Done expensive work
2021-06-12T13:47:08.453Z - 8 - Done expensive work
2021-06-12T13:47:08.453Z - 7 - Done expensive work
2021-06-12T13:47:08.453Z - 9 - Done expensive work
...

如您所见,我们将并行执行的任务数限制为 k

使用重载版本的 flatMap 无需并行即可轻松完成此操作,您可以在其中指定并发性:

flatMap(Function<? super T,? extends Publisher<? extends V>> mapper, int concurrency)

httpClient.getMainPageAsMono()
    .flatMapMany(html -> {
        Flux.fromIterable(getLinksFromPage(it));
    })
    .flatMap(link -> client.getAnotherPageByLink(link), k)
    .....
    .subscribe()

根据代码,此操作在 CPU 方面并不昂贵,而是在 IO 方面,因此没有必要使用 ParallelFlux