通过 Project Reactor 完成所有任务,但不能同时完成 K 个任务
Complete all tasks, but no more K tasks at the same time via Project Reactor
我是 Project Reactor
的初学者,我认为这很简单,但我找不到解决方案。
我有 N
昂贵的任务要做,我想在 Java 中实现类似 Bounded Semaphore
的东西(不要请求下一个元素,直到 运行 任务的当前计数小于 K
).
很快:完成所有任务,但不再同时 K
个任务
Flux.range(1, 100)
.parallel()
.limit(K) // Something like this
.doOnNext(i -> expensiveWork(i))
.subscribe()
在 SO 上找到 ,但不适合 Reactor
。但意思是一样的。请帮忙
接近我的真实情况:
httpClient.getMainPageAsMono()
.flatMapMany(html -> {
Flux.fromIterable(getLinksFromPage(it));
})
.parallel(k)
.runOn(Schedulers.boundedElastic())
.flatMap(link -> {
// ON THIS PART IT EXECUTES ALL LINKS AT THE SAME TIME
// INSTEAD OF MAKING THROATTLE
client.getAnotherPageByLink(link);
})
.....
.subscribe()
即如果getLinksFromPage(it)
函数returns1000
links,每个nextlink都不会取到client.getAnotherPageByLink(link)
还没完吗
这个解决方案怎么样?我从 Flux 中删除了 parallel,以便缓冲 10 个元素。然后可以并行处理每个元素
public static final void main(String... args) {
Flux.range(1, 1000)
.buffer(10)
.doOnNext(grp -> grp.parallelStream().forEach(p -> System.out.println(Instant.now() + " : " + p)))
.doOnNext(grp -> sleep(1000)) // Wait for 1 second to see how the algorithm is working
.doOnNext(grp -> System.out.println("####"))
.subscribe();
}
private static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
输出为:
2021-06-12T14:16:23.760298200Z : 8
2021-06-12T14:16:23.760298200Z : 4
2021-06-12T14:16:23.760298200Z : 10
2021-06-12T14:16:23.760298200Z : 1
2021-06-12T14:16:23.760298200Z : 3
2021-06-12T14:16:23.760298200Z : 5
2021-06-12T14:16:23.760298200Z : 7
2021-06-12T14:16:23.760298200Z : 2
2021-06-12T14:16:23.760298200Z : 6
2021-06-12T14:16:23.760298200Z : 9
####
2021-06-12T14:16:24.784628Z : 17
2021-06-12T14:16:24.784628Z : 16
2021-06-12T14:16:24.784628Z : 20
2021-06-12T14:16:24.784628Z : 14
2021-06-12T14:16:24.784628Z : 11
2021-06-12T14:16:24.784628Z : 13
2021-06-12T14:16:24.784628Z : 18
2021-06-12T14:16:24.784628Z : 19
2021-06-12T14:16:24.784628Z : 12
2021-06-12T14:16:24.785801500Z : 15
如你所见,每10个元素在每秒内按组并行处理
仅使用 .parallel()
会给你一个 ParallelFlux
,但为了告诉结果 ParallelFlux
运行 每个轨道(并且,通过扩展, 运行 rails 并行)你必须使用 .runOn(Scheduler scheduler)
.
所以我们应该使用 .parallel(int parallelism)
和 .runOn(Scheduler scheduler)
:
public static void main(String[] args) throws InterruptedException {
int k = 3;
Flux.range(1, 100)
.parallel(k) // k rails
.runOn(Schedulers.boundedElastic()) // the rails will run on this scheduler
.doOnNext(i -> expensiveWork(i))
.subscribe();
Thread.currentThread().join(); // Just so program won't finish
}
private static void expensiveWork(Integer i) {
Instant start = Instant.now();
while (Duration.between(start, Instant.now()).getSeconds() < 5) ;
System.out.println(Instant.now()+" - "+i+" - Done expensive work");
}
输出:
2021-06-12T13:46:58.445Z - 3 - Done expensive work
2021-06-12T13:46:58.445Z - 1 - Done expensive work
2021-06-12T13:46:58.445Z - 2 - Done expensive work
2021-06-12T13:47:03.453Z - 5 - Done expensive work
2021-06-12T13:47:03.453Z - 6 - Done expensive work
2021-06-12T13:47:03.453Z - 4 - Done expensive work
2021-06-12T13:47:08.453Z - 8 - Done expensive work
2021-06-12T13:47:08.453Z - 7 - Done expensive work
2021-06-12T13:47:08.453Z - 9 - Done expensive work
...
如您所见,我们将并行执行的任务数限制为 k
。
使用重载版本的 flatMap
无需并行即可轻松完成此操作,您可以在其中指定并发性:
flatMap(Function<? super T,? extends Publisher<? extends V>> mapper, int concurrency)
httpClient.getMainPageAsMono()
.flatMapMany(html -> {
Flux.fromIterable(getLinksFromPage(it));
})
.flatMap(link -> client.getAnotherPageByLink(link), k)
.....
.subscribe()
根据代码,此操作在 CPU 方面并不昂贵,而是在 IO 方面,因此没有必要使用 ParallelFlux
。
我是 Project Reactor
的初学者,我认为这很简单,但我找不到解决方案。
我有 N
昂贵的任务要做,我想在 Java 中实现类似 Bounded Semaphore
的东西(不要请求下一个元素,直到 运行 任务的当前计数小于 K
).
很快:完成所有任务,但不再同时 K
个任务
Flux.range(1, 100)
.parallel()
.limit(K) // Something like this
.doOnNext(i -> expensiveWork(i))
.subscribe()
在 SO 上找到 Reactor
。但意思是一样的。请帮忙
接近我的真实情况:
httpClient.getMainPageAsMono()
.flatMapMany(html -> {
Flux.fromIterable(getLinksFromPage(it));
})
.parallel(k)
.runOn(Schedulers.boundedElastic())
.flatMap(link -> {
// ON THIS PART IT EXECUTES ALL LINKS AT THE SAME TIME
// INSTEAD OF MAKING THROATTLE
client.getAnotherPageByLink(link);
})
.....
.subscribe()
即如果getLinksFromPage(it)
函数returns1000
links,每个nextlink都不会取到client.getAnotherPageByLink(link)
还没完吗
这个解决方案怎么样?我从 Flux 中删除了 parallel,以便缓冲 10 个元素。然后可以并行处理每个元素
public static final void main(String... args) {
Flux.range(1, 1000)
.buffer(10)
.doOnNext(grp -> grp.parallelStream().forEach(p -> System.out.println(Instant.now() + " : " + p)))
.doOnNext(grp -> sleep(1000)) // Wait for 1 second to see how the algorithm is working
.doOnNext(grp -> System.out.println("####"))
.subscribe();
}
private static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
输出为:
2021-06-12T14:16:23.760298200Z : 8
2021-06-12T14:16:23.760298200Z : 4
2021-06-12T14:16:23.760298200Z : 10
2021-06-12T14:16:23.760298200Z : 1
2021-06-12T14:16:23.760298200Z : 3
2021-06-12T14:16:23.760298200Z : 5
2021-06-12T14:16:23.760298200Z : 7
2021-06-12T14:16:23.760298200Z : 2
2021-06-12T14:16:23.760298200Z : 6
2021-06-12T14:16:23.760298200Z : 9
####
2021-06-12T14:16:24.784628Z : 17
2021-06-12T14:16:24.784628Z : 16
2021-06-12T14:16:24.784628Z : 20
2021-06-12T14:16:24.784628Z : 14
2021-06-12T14:16:24.784628Z : 11
2021-06-12T14:16:24.784628Z : 13
2021-06-12T14:16:24.784628Z : 18
2021-06-12T14:16:24.784628Z : 19
2021-06-12T14:16:24.784628Z : 12
2021-06-12T14:16:24.785801500Z : 15
如你所见,每10个元素在每秒内按组并行处理
仅使用 .parallel()
会给你一个 ParallelFlux
,但为了告诉结果 ParallelFlux
运行 每个轨道(并且,通过扩展, 运行 rails 并行)你必须使用 .runOn(Scheduler scheduler)
.
所以我们应该使用 .parallel(int parallelism)
和 .runOn(Scheduler scheduler)
:
public static void main(String[] args) throws InterruptedException {
int k = 3;
Flux.range(1, 100)
.parallel(k) // k rails
.runOn(Schedulers.boundedElastic()) // the rails will run on this scheduler
.doOnNext(i -> expensiveWork(i))
.subscribe();
Thread.currentThread().join(); // Just so program won't finish
}
private static void expensiveWork(Integer i) {
Instant start = Instant.now();
while (Duration.between(start, Instant.now()).getSeconds() < 5) ;
System.out.println(Instant.now()+" - "+i+" - Done expensive work");
}
输出:
2021-06-12T13:46:58.445Z - 3 - Done expensive work
2021-06-12T13:46:58.445Z - 1 - Done expensive work
2021-06-12T13:46:58.445Z - 2 - Done expensive work
2021-06-12T13:47:03.453Z - 5 - Done expensive work
2021-06-12T13:47:03.453Z - 6 - Done expensive work
2021-06-12T13:47:03.453Z - 4 - Done expensive work
2021-06-12T13:47:08.453Z - 8 - Done expensive work
2021-06-12T13:47:08.453Z - 7 - Done expensive work
2021-06-12T13:47:08.453Z - 9 - Done expensive work
...
如您所见,我们将并行执行的任务数限制为 k
。
使用重载版本的 flatMap
无需并行即可轻松完成此操作,您可以在其中指定并发性:
flatMap(Function<? super T,? extends Publisher<? extends V>> mapper, int concurrency)
httpClient.getMainPageAsMono()
.flatMapMany(html -> {
Flux.fromIterable(getLinksFromPage(it));
})
.flatMap(link -> client.getAnotherPageByLink(link), k)
.....
.subscribe()
根据代码,此操作在 CPU 方面并不昂贵,而是在 IO 方面,因此没有必要使用 ParallelFlux
。