Java 迭代列表<?扩展通量>
Java iterate List<? extends Flux>
我的方法得到了Flux。
我如何遍历 Flux?
我想遍历它的对象并对每个子对象进行操作。
public void write(List<? extends Flux<Child>> childFlux) throws Exception {
childFlux.stream()
.map(children -> children.collectList())
.forEach(child -> run(child); //not compile
}
public void run(Child child) {
//TO DO
}
这似乎是一种反模式。但是也有一些基本的错误。
map(children -> children.collectList())
会 return 一个 Mono<List<Child>>
forEach(child -> run(child);
你忘记了一个右括号,应该是 forEach(child -> run(child));
.
- 但它不会编译,因为 child 将是
Mono<List<Child>>
而不是 Child
- 当你使用响应式编程时,订阅前什么都没有发生
你真正需要做的是
Flux.concat(childFlux).subscribe(this::run)
Concatenate all sources provided in an Iterable, forwarding elements emitted by the sources downstream.
或
Flux.merge(childFlux).subscribe(this::run)
Merge data from Publisher sequences contained in an array / vararg into an interleaved merged sequence. Unlike concat, sources are subscribed to eagerly.
我的方法得到了Flux。 我如何遍历 Flux? 我想遍历它的对象并对每个子对象进行操作。
public void write(List<? extends Flux<Child>> childFlux) throws Exception {
childFlux.stream()
.map(children -> children.collectList())
.forEach(child -> run(child); //not compile
}
public void run(Child child) {
//TO DO
}
这似乎是一种反模式。但是也有一些基本的错误。
map(children -> children.collectList())
会 return 一个Mono<List<Child>>
forEach(child -> run(child);
你忘记了一个右括号,应该是forEach(child -> run(child));
.- 但它不会编译,因为 child 将是
Mono<List<Child>>
而不是Child
- 当你使用响应式编程时,订阅前什么都没有发生
你真正需要做的是
Flux.concat(childFlux).subscribe(this::run)
Concatenate all sources provided in an Iterable, forwarding elements emitted by the sources downstream.
或
Flux.merge(childFlux).subscribe(this::run)
Merge data from Publisher sequences contained in an array / vararg into an interleaved merged sequence. Unlike concat, sources are subscribed to eagerly.