RxJava2 中的功能反应运算符,可根据先前状态和附加输入生成新状态
Functional-reactive operator in RxJava2 that produces new state from previous state and additional inputs
我正在做一个小展示,旨在演示如何使用函数式反应式编程(特别是 RxJava2)以纯函数式的方式编写交互式程序。
我的目标是创建一个简单的互动游戏。游戏的主要功能(我们称其为 Next Game State,简称为 NGS)采用游戏的当前状态、用户输入以及随机数,并根据这三个输入计算游戏的下一个状态。到目前为止相当简单。用户输入和随机数是从 Iterable
或通过生成器创建的 Flowable
。我设想游戏状态本身也将是 Flowable(但我对此可能是错误的)。
我正在努力寻找合适的函数反应运算符,将函数应用于三个输入并产生游戏的下一个状态。最初,我认为 Flowable.zip(source1, source2, source3, zipper)
是正确的操作:它可以采用三个流并通过 NGS 函数将其组合成一个新的 Flowable。不幸的是,这并没有说明生成的 Flowable 本身需要成为 zip 操作的输入之一,这似乎是不可能的设置。我的下一个想法是使用 Flowable.generate
,但我需要来自其他 Flowable
的两个额外输入来计算下一个状态,并且无法将它们提供给 generate
运算符。
简而言之,我正在尝试实现与此(伪)大理石图类似的东西:
Game --------------(0)-------------(1)-----------------(2)-----------------(3)---
State \ _______ / \ _______ / \ _______ /
\_| |_/ \_____| |_/ \_____| |_/
| Next | | Next | | Next |
_____| Game | _____| Game | _____| Game |
/ __| State | / __| State | / __| State |
/ / '_______' / / '_______' / / '_______'
/ / / / / /
User / / / / / /
Input --------o---/---------------o---/---------------o---/--------------------
/ / /
Random / / /
Number --------o-------------------o-------------------o---------------------------
顶行,我承认,有点不标准,但这是我最好的形象化尝试,从初始游戏状态 (0)
开始,每个连续的游戏状态 (1)
,(2)
、(3)
等,是根据先前的游戏状态加上额外的输入创建的。
我意识到可能有一种相对直接的方法可以通过 Emitter
或在下游订阅者内部执行此操作,但此练习的目标之一是表明无需完全解决此问题副作用或 void
方法。
所以,长话短说,是否有支持此行为的现有运算符?如果没有,创建一个会涉及什么?或者是否有一些替代解决方案使用略有不同的整体设置?
假设
用户输入和随机数是两个独立的流,可以随时自行发出。
解决方案
long story short, is there an existing operator that supports this behavior?
是的,有。你可以使用 #scan(seed<T>, { current<T>, upstream<Q> -> newValue<T> }
.
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.functions.BiFunction
import io.reactivex.rxjava3.processors.PublishProcessor
import org.junit.jupiter.api.Test
class So65589721 {
@Test
fun `65589721`() {
val userInput = PublishProcessor.create<String>()
val randomNumber = PublishProcessor.create<Int>()
val scan = Flowable.combineLatest(userInput, randomNumber, BiFunction<String, Int, Pair<String, Int>> { u, r ->
u to r
}).scan<GameState>(GameState.State1, { currentState, currentInputTuple ->
// calculate new state from `currentState` and combined pair
GameState.State3(currentInputTuple.first, currentInputTuple.second)
})
val test = scan.test()
randomNumber.offer(42)
userInput.offer("input1")
userInput.offer("input2")
test
.assertValues(GameState.State1, GameState.State3("input1", 42), GameState.State3("input2", 42))
}
interface GameState {
object State1 : GameState
data class State3(val value: String, val random: Int) : GameState
}
}
此示例展示了如何使用 scan
从给定的输入对和当前状态计算新状态。使用 scan
,您可以以安全的方式“保持”状态,而不会将其暴露给副作用。
备注
- 传递给
scan
的 seed-value
将作为订阅的第一个值发出。
我正在做一个小展示,旨在演示如何使用函数式反应式编程(特别是 RxJava2)以纯函数式的方式编写交互式程序。
我的目标是创建一个简单的互动游戏。游戏的主要功能(我们称其为 Next Game State,简称为 NGS)采用游戏的当前状态、用户输入以及随机数,并根据这三个输入计算游戏的下一个状态。到目前为止相当简单。用户输入和随机数是从 Iterable
或通过生成器创建的 Flowable
。我设想游戏状态本身也将是 Flowable(但我对此可能是错误的)。
我正在努力寻找合适的函数反应运算符,将函数应用于三个输入并产生游戏的下一个状态。最初,我认为 Flowable.zip(source1, source2, source3, zipper)
是正确的操作:它可以采用三个流并通过 NGS 函数将其组合成一个新的 Flowable。不幸的是,这并没有说明生成的 Flowable 本身需要成为 zip 操作的输入之一,这似乎是不可能的设置。我的下一个想法是使用 Flowable.generate
,但我需要来自其他 Flowable
的两个额外输入来计算下一个状态,并且无法将它们提供给 generate
运算符。
简而言之,我正在尝试实现与此(伪)大理石图类似的东西:
Game --------------(0)-------------(1)-----------------(2)-----------------(3)---
State \ _______ / \ _______ / \ _______ /
\_| |_/ \_____| |_/ \_____| |_/
| Next | | Next | | Next |
_____| Game | _____| Game | _____| Game |
/ __| State | / __| State | / __| State |
/ / '_______' / / '_______' / / '_______'
/ / / / / /
User / / / / / /
Input --------o---/---------------o---/---------------o---/--------------------
/ / /
Random / / /
Number --------o-------------------o-------------------o---------------------------
顶行,我承认,有点不标准,但这是我最好的形象化尝试,从初始游戏状态 (0)
开始,每个连续的游戏状态 (1)
,(2)
、(3)
等,是根据先前的游戏状态加上额外的输入创建的。
我意识到可能有一种相对直接的方法可以通过 Emitter
或在下游订阅者内部执行此操作,但此练习的目标之一是表明无需完全解决此问题副作用或 void
方法。
所以,长话短说,是否有支持此行为的现有运算符?如果没有,创建一个会涉及什么?或者是否有一些替代解决方案使用略有不同的整体设置?
假设
用户输入和随机数是两个独立的流,可以随时自行发出。
解决方案
long story short, is there an existing operator that supports this behavior?
是的,有。你可以使用 #scan(seed<T>, { current<T>, upstream<Q> -> newValue<T> }
.
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.functions.BiFunction
import io.reactivex.rxjava3.processors.PublishProcessor
import org.junit.jupiter.api.Test
class So65589721 {
@Test
fun `65589721`() {
val userInput = PublishProcessor.create<String>()
val randomNumber = PublishProcessor.create<Int>()
val scan = Flowable.combineLatest(userInput, randomNumber, BiFunction<String, Int, Pair<String, Int>> { u, r ->
u to r
}).scan<GameState>(GameState.State1, { currentState, currentInputTuple ->
// calculate new state from `currentState` and combined pair
GameState.State3(currentInputTuple.first, currentInputTuple.second)
})
val test = scan.test()
randomNumber.offer(42)
userInput.offer("input1")
userInput.offer("input2")
test
.assertValues(GameState.State1, GameState.State3("input1", 42), GameState.State3("input2", 42))
}
interface GameState {
object State1 : GameState
data class State3(val value: String, val random: Int) : GameState
}
}
此示例展示了如何使用 scan
从给定的输入对和当前状态计算新状态。使用 scan
,您可以以安全的方式“保持”状态,而不会将其暴露给副作用。
备注
- 传递给
scan
的seed-value
将作为订阅的第一个值发出。