RxJava2 中的功能反应运算符,可根据先前状态和附加输入生成新状态

Functional-reactive operator in RxJava2 that produces new state from previous state and additional inputs

我正在做一个小展示,旨在演示如何使用函数式反应式编程(特别是 RxJava2)以纯函数式的方式编写交互式程序。

我的目标是创建一个简单的互动游戏。游戏的主要功能(我们称其为 Next Game State,简称为 NGS)采用游戏的当前状态、用户输入以及随机数,并根据这三个输入计算游戏的下一个状态。到目前为止相当简单。用户输入和随机数是从 Iterable 或通过生成器创建的 Flowable。我设想游戏状态本身也将是 Flowable(但我对此可能是错误的)。

我正在努力寻找合适的函数反应运算符,将函数应用于三个输入并产生游戏的下一个状态。最初,我认为 Flowable.zip(source1, source2, source3, zipper) 是正确的操作:它可以采用三个流并通过 NGS 函数将其组合成一个新的 Flowable。不幸的是,这并没有说明生成的 Flowable 本身需要成为 zip 操作的输入之一,这似乎是不可能的设置。我的下一个想法是使用 Flowable.generate,但我需要来自其他 Flowable 的两个额外输入来计算下一个状态,并且无法将它们提供给 generate 运算符。

简而言之,我正在尝试实现与此(伪)大理石图类似的东西:

  Game --------------(0)-------------(1)-----------------(2)-----------------(3)---
  State                \   _______   / \       _______   / \       _______   /
                        \_|       |_/   \_____|       |_/   \_____|       |_/
                          | Next  |           | Next  |           | Next  |
                     _____| Game  |      _____| Game  |      _____| Game  |
                    /   __| State |     /   __| State |     /   __| State |
                   /   /  '_______'    /   /  '_______'    /   /  '_______'
                  /   /               /   /               /   /
  User           /   /               /   /               /   /
  Input --------o---/---------------o---/---------------o---/--------------------
                   /                   /                   /
  Random          /                   /                   /
  Number --------o-------------------o-------------------o---------------------------

顶行,我承认,有点不标准,但这是我最好的形象化尝试,从初始游戏状态 (0) 开始,每个连续的游戏状态 (1)(2)(3) 等,是根据先前的游戏状态加上额外的输入创建的。

我意识到可能有一种相对直接的方法可以通过 Emitter 或在下游订阅者内部执行此操作,但此练习的目标之一是表明无需完全解决此问题副作用或 void 方法。

所以,长话短说,是否有支持此行为的现有运算符?如果没有,创建一个会涉及什么?或者是否有一些替代解决方案使用略有不同的整体设置?

假设

用户输入和随机数是两个独立的流,可以随时自行发出。

解决方案

long story short, is there an existing operator that supports this behavior?

是的,有。你可以使用 #scan(seed<T>, { current<T>, upstream<Q> -> newValue<T> }.

import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.functions.BiFunction
import io.reactivex.rxjava3.processors.PublishProcessor
import org.junit.jupiter.api.Test

class So65589721 {
    @Test
    fun `65589721`() {
        val userInput = PublishProcessor.create<String>()
        val randomNumber = PublishProcessor.create<Int>()

        val scan = Flowable.combineLatest(userInput, randomNumber, BiFunction<String, Int, Pair<String, Int>> { u, r ->
            u to r
        }).scan<GameState>(GameState.State1, { currentState, currentInputTuple ->
            // calculate new state from `currentState` and combined pair
            GameState.State3(currentInputTuple.first, currentInputTuple.second)
        })

        val test = scan.test()

        randomNumber.offer(42)

        userInput.offer("input1")
        userInput.offer("input2")

        test
            .assertValues(GameState.State1, GameState.State3("input1", 42), GameState.State3("input2", 42))
    }

    interface GameState {
        object State1 : GameState
        data class State3(val value: String, val random: Int) : GameState
    }
}

此示例展示了如何使用 scan 从给定的输入对和当前状态计算新状态。使用 scan,您可以以安全的方式“保持”状态,而不会将其暴露给副作用。

备注

  • 传递给 scanseed-value 将作为订阅的第一个值发出。