生成将函数应用于 Reactor Flux 的两个连续元素的元素

Generate elements applying a function to two consecutive elements of a Reactor Flux

我有一个 Flux 会发射一些元素,为了简单起见,假设一个 Flux<String> -> "1", "2", "3", "n"

我需要做的是获取两个连续的元素并应用一个操作(例如 flatMap)从中生成一些元素。我们再次假设该函数将第一个元素与第二个元素连接起来,将第二个元素与第一个元素连接起来:

f(x,y) -> "xy", "yx"

所以从原始 Flux 发出的最终序列应该是:

"12" - "21" - "23" - "32" - "3n" - "n3"

这是怎么做到的?

使用buffer(2, 1)解决:

fun main() {
    val flux = listOf("1", "2", "3", "4").toFlux()
    flux.buffer(2, 1)
            .flatMap {
                if (it.size == 2) {
                    listOf(it[0] + it[1], it[1] + it[0]).toFlux()
                } else {
                    Flux.empty()
                }
            }.doOnNext { println(it) }
            .subscribe()
}