Flux.generate 与消费者和并行
Flux.generate with Consumer and parallel
我有简单的 Flux
Flux<Long> flux = Flux.generate(
AtomicLong::new,
(state, sink) -> {
long i = state.getAndIncrement();
sink.next(i);
if (i == 3) sink.complete();
return state;
}, (state) -> System.out.println("state: " + state));
在单线程中按预期工作:
flux.subscribe(System.out::println);
输出为
0 1 2 3 state: 4
但是当我切换到并行时:
flux.parallel().runOn(Schedulers.elastic()).subscribe(System.out::println);
应打印状态的消费者:未调用 Number。我刚看到:
0 3 2 1
这是错误还是预期的功能?
我不是反应专家,但在深入研究源代码后,我发现这种行为是设计使然的;似乎创建 ParallelFlux 具有阻止状态消费者调用的副作用;如果你想并行并调用状态消费者,你可以使用:
flux.publishOn(Schedulers.elastic()).subscribe(System.out::println);
我有简单的 Flux
Flux<Long> flux = Flux.generate(
AtomicLong::new,
(state, sink) -> {
long i = state.getAndIncrement();
sink.next(i);
if (i == 3) sink.complete();
return state;
}, (state) -> System.out.println("state: " + state));
在单线程中按预期工作:
flux.subscribe(System.out::println);
输出为
0 1 2 3 state: 4
但是当我切换到并行时:
flux.parallel().runOn(Schedulers.elastic()).subscribe(System.out::println);
应打印状态的消费者:未调用 Number。我刚看到:
0 3 2 1
这是错误还是预期的功能?
我不是反应专家,但在深入研究源代码后,我发现这种行为是设计使然的;似乎创建 ParallelFlux 具有阻止状态消费者调用的副作用;如果你想并行并调用状态消费者,你可以使用:
flux.publishOn(Schedulers.elastic()).subscribe(System.out::println);