Project Reactor:如何控制 Flux 排放
Project Reactor: How to control Flux emission
我有一个能发出一些 Date
的通量。此 Date
映射到 1024 个模拟 HTTP 请求,我在某些 Executer
.
上 运行
我想做的是在发出下一个 Date
之前等待所有 1024 个 HTTP 请求。
目前运行时,onNext()
被多次调用,然后稳定在某个稳定的速率上。
我怎样才能改变这种行为?
P.S。如果需要,我愿意改变架构。
private void run() throws Exception {
Executor executor = Executors.newFixedThreadPool(2);
Flux<Date> source = Flux.generate(emitter ->
emitter.next(new Date())
);
source
.log()
.limitRate(1)
.doOnNext(date -> System.out.println("on next: " + date))
.map(date -> Flux.range(0, 1024))
.flatMap(i -> Mono.fromCallable(Pipeline::simulateHttp)
.subscribeOn(Schedulers.fromExecutor(executor)))
.subscribe(s -> System.out.println(s));
Thread.currentThread().join();
}
模拟HTTP请求:
private static String simulateHttp() {
try {
System.out.println("start http call");
Thread.sleep(3_000);
} catch (Exception e) {}
return "HTML content";
}
编辑:改编自答案的代码:
- 首先,我的代码中有一个错误(需要另一个
flatMap
)
其次,我把1
的concurrency
参数加到flatMap
(好像两个都需要)
Executor executor = Executors.newSingleThreadExecutor();
Flux<Date> source = Flux.generate(emitter -> {
System.out.println("emitter called!");
emitter.next(new Date());
});
source
.limitRate(1)
.map(date -> Flux.range(0, 16))
.flatMap(Function.identity(), 1) # concurrency = 1
.flatMap(i -> Mono.fromCallable(Pipeline::simulateHttp)
.subscribeOn(Schedulers.fromExecutor(executor)), 1) # concurrency = 1
.subscribe(s -> System.out.println(s));
Thread.currentThread().join();
你应该看看这些方法:
concatMap
确保 flux 上的元素在运算符内部顺序处理:
Generation of inners and subscription: this operator waits for one
inner to complete before generating the next one and subscribing to
it.
flatMap
允许您通过公开 concurrency
和 prefetch
参数来执行相同的操作,这些参数使您可以更好地控制此行为:
The concurrency argument allows to control how many Publisher can be
subscribed to and merged in parallel. In turn, that argument shows the
size of the first Subscription.request(long) to the upstream. The
prefetch argument allows to give an arbitrary prefetch size to the
merged Publisher (in other words prefetch size means the size of the
first Subscription.request(long) to the merged Publisher).
我有一个能发出一些 Date
的通量。此 Date
映射到 1024 个模拟 HTTP 请求,我在某些 Executer
.
我想做的是在发出下一个 Date
之前等待所有 1024 个 HTTP 请求。
目前运行时,onNext()
被多次调用,然后稳定在某个稳定的速率上。
我怎样才能改变这种行为?
P.S。如果需要,我愿意改变架构。
private void run() throws Exception {
Executor executor = Executors.newFixedThreadPool(2);
Flux<Date> source = Flux.generate(emitter ->
emitter.next(new Date())
);
source
.log()
.limitRate(1)
.doOnNext(date -> System.out.println("on next: " + date))
.map(date -> Flux.range(0, 1024))
.flatMap(i -> Mono.fromCallable(Pipeline::simulateHttp)
.subscribeOn(Schedulers.fromExecutor(executor)))
.subscribe(s -> System.out.println(s));
Thread.currentThread().join();
}
模拟HTTP请求:
private static String simulateHttp() {
try {
System.out.println("start http call");
Thread.sleep(3_000);
} catch (Exception e) {}
return "HTML content";
}
编辑:改编自答案的代码:
- 首先,我的代码中有一个错误(需要另一个
flatMap
) 其次,我把
1
的concurrency
参数加到flatMap
(好像两个都需要)Executor executor = Executors.newSingleThreadExecutor(); Flux<Date> source = Flux.generate(emitter -> { System.out.println("emitter called!"); emitter.next(new Date()); }); source .limitRate(1) .map(date -> Flux.range(0, 16)) .flatMap(Function.identity(), 1) # concurrency = 1 .flatMap(i -> Mono.fromCallable(Pipeline::simulateHttp) .subscribeOn(Schedulers.fromExecutor(executor)), 1) # concurrency = 1 .subscribe(s -> System.out.println(s)); Thread.currentThread().join();
你应该看看这些方法:
concatMap
确保 flux 上的元素在运算符内部顺序处理:
Generation of inners and subscription: this operator waits for one inner to complete before generating the next one and subscribing to it.
flatMap
允许您通过公开 concurrency
和 prefetch
参数来执行相同的操作,这些参数使您可以更好地控制此行为:
The concurrency argument allows to control how many Publisher can be subscribed to and merged in parallel. In turn, that argument shows the size of the first Subscription.request(long) to the upstream. The prefetch argument allows to give an arbitrary prefetch size to the merged Publisher (in other words prefetch size means the size of the first Subscription.request(long) to the merged Publisher).