单身名单,只使用第一次成功的结果

List of Singles, use only first successful result

假设我有输入参数列表:

val array = arrayOf(input1, input2, ... inputN) // N is usually less than 10

我必须通过一些繁重的计算来处理这些参数。因此,为了优化它,我试图 运行 每个计算都在他自己的线程中 运行 与其他人同时进行。我为此使用 RxJava2:

sealed class Result {
   object Success : Result()
   object NotFound : Result()
}

fun processArray(arr: Array<Input>): Single<Result> {
    val singles = arr.map { input ->
        Single.fromCallable { 
            val time = System.currentTimeMillis()
            val r = process(input) 

            log("$r, took ${System.currentTimeMillis() - time}ms")
            return@fromCallable r
        }
            .subscribeOn(Schedulers.newThread())
    }

    return Single.zip(singles) { results ->
        val r = results.map { it as Result }
            .firstOrNull { it is Result.Success }
            ?: Result.NotFound

        log("result is: $r")
        return@zip r
    }
} 

fun process(input: Input): Result

一切正常,但当我查看日志时,我通常会看到以下内容:

NotFound, took 130ms
NotFound, took 300ms
Success, took 220ms
NotFound, took 78ms
NotFound, took 540ms
NotFound, took 256ms
result is Success
proccessing took 547ms

这没有意义,因为我只需要返回第一个成功的结果。但是这段代码将等待所有这些完成,即使它已经找到了 Result.Success(正如您从日志中看到的,总花费的时间 == 547 毫秒,因为我们正在等待带有 NotFound, took 540ms 的项目到完成,但此刻我得到了 Result.Success 我知道剩下的将是 NotFound)

所以问题是:

是否可以 运行 多个 Single.fromCallable() 并在找到第一个成功结果后处理其余的?

您可以合并而不是 zip,然后过滤以获取类型为 Success 的第一个元素,如下所示

sealed class Result {
    object Success : Result()
    object NotFound : Result()
}

fun processArray(arr: Array<Input>): Single<Result> {
    val singles = arr.map { input ->
        Single.fromCallable {
            val time = System.currentTimeMillis()
            val r = process(input)

            log("$r, took ${System.currentTimeMillis() - time}ms")
            return@fromCallable r
        }
            .subscribeOn(Schedulers.newThread())
    }

    return Single
        .merge(singles)
        .filter { it is Result.Success }
        .firstElement()
        .switchIfEmpty(Single.just(Result.NotFound))
}

fun process(input: Input): Result