如何 运行 并行通量,但按顺序收集结果

How to run flux in parallel, but collect result in sequence

我想 运行 并行通量,但按顺序收集结果。假设我在并行完成一些任务后有 [3,2,1] 的通量我希望结果仍然是 [3,2,1]

val mono = Flux.fromIterable(3 downTo 1)
      .map { it.toString() }
      // this will return the same number
      .flatMap { number -> task(number) }
      .doOnNext { println("Number of $it") }
      // I got 1, 3, 2 which is good because I want it to run in parallel
      .collectList()
      .doOnNext { println(it) }
      // I still got [1,3,2] but I want it to be [3,2,1] according to the iterable order.
      .block()

我认为 flatMapSequential 运算符就是您要找的。

Flux.fromIterable(3 downTo 1)
      .map { it.toString() }
      .flatMapSequential { number -> task(number) }
      .doOnNext { println("Number of $it") }
      .collectList()
      .doOnNext { println(it) }

此处,flatMapSequential 急切地订阅其内部发布者(与 flatMap 并行),但根据需要按其源元素的顺序合并它们。

示例输出:

3
1
2
[3, 2, 1]