生成将函数应用于 Reactor Flux 的两个连续元素的元素
Generate elements applying a function to two consecutive elements of a Reactor Flux
我有一个 Flux 会发射一些元素,为了简单起见,假设一个 Flux<String>
-> "1", "2", "3", "n"
我需要做的是获取两个连续的元素并应用一个操作(例如 flatMap)从中生成一些元素。我们再次假设该函数将第一个元素与第二个元素连接起来,将第二个元素与第一个元素连接起来:
f(x,y) -> "xy", "yx"
所以从原始 Flux 发出的最终序列应该是:
"12" - "21" - "23" - "32" - "3n" - "n3"
这是怎么做到的?
使用buffer(2, 1)
解决:
fun main() {
val flux = listOf("1", "2", "3", "4").toFlux()
flux.buffer(2, 1)
.flatMap {
if (it.size == 2) {
listOf(it[0] + it[1], it[1] + it[0]).toFlux()
} else {
Flux.empty()
}
}.doOnNext { println(it) }
.subscribe()
}
我有一个 Flux 会发射一些元素,为了简单起见,假设一个 Flux<String>
-> "1", "2", "3", "n"
我需要做的是获取两个连续的元素并应用一个操作(例如 flatMap)从中生成一些元素。我们再次假设该函数将第一个元素与第二个元素连接起来,将第二个元素与第一个元素连接起来:
f(x,y) -> "xy", "yx"
所以从原始 Flux 发出的最终序列应该是:
"12" - "21" - "23" - "32" - "3n" - "n3"
这是怎么做到的?
使用buffer(2, 1)
解决:
fun main() {
val flux = listOf("1", "2", "3", "4").toFlux()
flux.buffer(2, 1)
.flatMap {
if (it.size == 2) {
listOf(it[0] + it[1], it[1] + it[0]).toFlux()
} else {
Flux.empty()
}
}.doOnNext { println(it) }
.subscribe()
}