如何在不丢失其类型的情况下将 rxjava2 Zip 函数的元数(从 Single/Observable)概括为 n 个可空参数?

How can I generalize the arity of rxjava2 Zip function (from Single/Observable) to n Nullable arguments without lose its types?


1) 类型检查丢失

使用数组参数 Single.zip() 版本我丢失了强类型参数。

2) 源参数不能为空

我无法发送可为空的源值作为 Single.zip() 函数的参数

3) 我想要一种方法来替代采用 Object[] 未键入的方法:

4) 我不想要可变对象,我不想在我的 class 中使用 var ], 我想用 val

public static <T, R> Single<R> zipArray(Function<? super Object[], ? extends R> zipper, SingleSource<? extends T>... sources) ...


在 haskell 中,我可以使用应用函子实现此目的:

f <$> a1 <*> a2 <*> a3 <*> a4 <*> a5 <*> a6 <*> a7 <*> a8 <*> a9 <*> a10 <*> a11

成为f :: Int -> Int -> Int -> Int -> Int -> Int -> Int -> String -> String -> String -> Int

a1 .. a11值对应每种类型



在所有这些情况下,都很好,因为每个参数都是键入的。 但有一个限制,直到 9 个单一来源

在我们的项目中,我们需要更多的资源,因为我们有很多服务想要实现异步(在我们的例子中是 11 个参数)。

但问题是参数失去了它们的强类型,更糟糕的是,其中一些可能是 Nullable


val bothSubscribed = CountDownLatch(2) // Change this value to 0 to run the test faster
val subscribeThreadsStillRunning = CountDownLatch(1) // Change this value to 0 to run the test faster

val service = { s1: String,
                s2: Int,
                s3: String?,
                s4: Int,
                s5: String,
                s6: String,
                s7: String,
                s8: String,
                s9: String,
                s10: String?,
                s11: String ->
    val result =
        listOf(s1, "$s2", s3 ?: "none", "$s4", s5, s6, s7, s8, s9, s10 ?: "none", s11).joinToString(separator = ";")

val createSingle = { value: String ->
        .create<String> { emitter ->
            println("Parallel subscribe $value on ${Thread.currentThread().name}")
            subscribeThreadsStillRunning.await(20, TimeUnit.SECONDS)

val s1 = createSingle("v1")
val s2 = Single.just(2)
val s3 = null
val s4 = Single.just(4)
val s5 = createSingle("v5")
val s6 = createSingle("v6")
val s7 = createSingle("v7")
val s8 = createSingle("v8")
val s9 = createSingle("v9")
val s10 = null
val s11 = createSingle("v11")


 val result = Single.zipArray(
) { arrayResult ->
        arrayResult[0] as String,
        arrayResult[1] as String,
        arrayResult[2] as String?,
        arrayResult[3] as String,
        arrayResult[4] as String,
        arrayResult[5] as String,
        arrayResult[6] as String,
        arrayResult[7] as String,
        arrayResult[8] as String,
        arrayResult[9] as String?,
        arrayResult[10] as String

    .awaitDone(50, TimeUnit.SECONDS)


arrayResult[0] as String,
arrayResult[1] as Int,
arrayResult[2] as String?,
arrayResult[3] as Int,
arrayResult[4] as String,
arrayResult[5] as String,
arrayResult[6] as String,
arrayResult[7] as String,
arrayResult[8] as String,
arrayResult[9] as String?,
arrayResult[10] as String


1) Single.zip() 函数中的 None 可以将可为 null 的值作为参数。

2) 您可以更改数组中值的顺序,它可能会因为类型检查转换而失败


  1. Kotlin 扩展函数
  2. 柯里化函数(Kotlin 允许)
  3. 部分应用(Kotlin 也允许)
  4. 函子和应用函子概念(Single 和 Observable 类 是应用函子)
  5. 混合在一起:

首先,zipOver 函数,用于不可空值:

 * Returns a Single that is the result of applying the function inside the context (a Single in this case).
 * This function is curried and will be used as an Applicative Functor, so each argument will be given
 * one by one
 * @param <B> the result value type
 * @param applicativeValue
 *            a Single that contains the input value of the function
 * @return the Single returned when the function is applied to the applicative value.
 * Each application will be executed on <b>a new thread</b> if and only if the Single is subscribed on a specific scheduler
infix fun <A, B> Single<(A) -> (B)>.zipOver(applicativeValue: Single<A>): Single<B> =
    Single.zip(this, applicativeValue, BiFunction { f, a -> f(a) })

然后,zipOverNullable 对于可空值:

 * Returns a Single that is the result of applying the function inside the context (a Single in this case).
 * This function is curried and will be used as an Applicative Functor, so each argument will be given
 * one by one
 * @param <B> the result value type
 * @param applicativeValue
 *            a Single that contains the input value of the function and it can be null
 * @return the Single returned when the function is applied to the applicative value even when
 * it is null.
 * Each application will be executed on <b>a new thread</b> if and only if the Single is subscribed on a specific scheduler
infix fun <A, B> Single<(A?) -> (B)>.zipOverNullable(applicativeValue: Single<A>?): Single<B> =
    when {
        applicativeValue != null -> Single.zip(this, applicativeValue, BiFunction { f, a -> f(a) })
        else -> this.map { it(null) }

我使用 org.funktionale.currying 作为 curried() 函数


    val bothSubscribed = CountDownLatch(0) // Change this value to 2 to run the test slowly
    val subscribeThreadsStillRunning = CountDownLatch(0) // Change this value to 1 to run the test slowly

    val service: (String, String, String?, String, String, String, String, String, String, String?, String) -> Single<String> = { 
                    s1: String,
                    s2: Int,
                    s3: String?,
                    s4: Int,
                    s5: String,
                    s6: String,
                    s7: String,
                    s8: String,
                    s9: String,
                    s10: String?,
                    s11: String ->
        val result =
            listOf(s1, "$s2", s3 ?: "none", "$s4", s5, s6, s7, s8, s9, s10 ?: "none", s11).joinToString(separator = ";")

    val createSingle = { value: String ->
            .create<String> { emitter ->
                println("Parallel subscribe $value on ${Thread.currentThread().name}")
                subscribeThreadsStillRunning.await(20, TimeUnit.SECONDS)

    val s1: Single<String> = createSingle("v1")
    val s2: Single<Int> = Single.just(2)
    // Here, we move the Nullable value outside, so the whole Single<String> is Nullable, and not the value inside the Single`enter code here`
    val s3: Single<String>? = null
    val s4: Single<String> = Single.just(4)
    val s5: Single<String> = createSingle("v5")
    val s6: Single<String> = createSingle("v6")
    val s7: Single<String> = createSingle("v7")
    val s8: Single<String> = createSingle("v8")
    val s9: Single<String> = createSingle("v9")
    val s10: Single<String>? = null
    val s11 = createSingle("v11")

    // Here I curry the function, so I can apply one by one the the arguments via zipOver() and preserve the types 

    val singleFunction: Single<(String) -> (String) -> (String?) -> (String) -> (String) -> (String) -> (String) -> (String) -> (String) -> (String?) -> (String) -> Single<String>> =

    val result = singleFunction
        .flatMap { it }

        .awaitDone(50, TimeUnit.SECONDS)


Parallel subscribe v11 on RxCachedThreadScheduler-10
Parallel subscribe v8 on RxCachedThreadScheduler-8
Parallel subscribe 4 on RxCachedThreadScheduler-4
Parallel subscribe v5 on RxCachedThreadScheduler-5
Parallel subscribe v9 on RxCachedThreadScheduler-9
Parallel subscribe 2 on RxCachedThreadScheduler-3
Parallel subscribe v6 on RxCachedThreadScheduler-6
Parallel subscribe v1 on RxCachedThreadScheduler-2
Parallel subscribe v7 on RxCachedThreadScheduler-7


    val result = singleFunction
        .flatMap { it }


具有 11 个参数的函数是不干净代码的一个很好的例子。相反,您应该考虑构建一个模型来满足您的需求。像这样,您也可以为每个参数提供有意义的名称。

data class MyObject(...)

class MyMutableObject {
    private lateinit var param0: String
    private var param1: Int

    fun setParam0(value: String) {
        param0 = value
    fun setParam1(value: Int) {
        param1 = value

    fun toMyObject() = MyObject(

有了这个模型,您可以在每个来源上使用 zipWith() 运算符。

      .zipWith(source0, MyMutableObject::setParam0)
      .zipWith(source1, MyMutableObject::setParam1)

如果您考虑将可空性抽象为 Maybe,您可以简单地定义一个扩展函数接收 Maybe 有数据或无数据并适当地映射它。

inline fun <T, U, R> Single<T>.zipWith(
        other: MaybeSource<U>,
        crossinline zipper: (T, U) -> R
) = other.zipWith(toMaybe()) { t, u -> zipper(t, u) }