如何在 Kotlin CoRoutine 中编写等效的 rx concatArrayEager?

How to write rx concatArrayEager equivalent in Kotlin CoRoutine?

我想将我的 rxJava 代码转换为 Kotlin CoRoutine。

下面的代码使 api 和 db 调用以及 returns 数据到 UI 无论先到什么。让我们假设 DB 响应是否恰好比 api 快。在那种情况下,api 响应将继续,直到它接收到要与数据库同步的数据,尽管它本可以更早地完成 UI 更新。

我该怎么做?

class MoviesRepository @Inject constructor(val apiInterface: ApiInterface,
                                        val MoviesDao: MoviesDao) {

fun getMovies(): Observable<List<Movie>> {
    val observableFromApi = getMoviesFromApi()
    val observableFromDb = getMoviesFromDb()
    return Observable.concatArrayEager(observableFromApi, observableFromDb)
}

fun getMoviesFromApi(): Observable<List<Movie>> {

    return apiInterface.getMovies()
            .doOnNext { it ->
                it.data?.let { it1 -> MoviesDao.insertAllMovies(it1) }
                println("Size of Movies from API %d", it.data?.size)
            }
            .map({ r -> r.data })
}

fun getMoviesFromDb(): Observable<List<Movie>> {
    return MoviesDao.queryMovies()
            .toObservable()
            .doOnNext {
                //Print log it.size :)
            }
}

}

按照这些思路应该可以工作:

data class Result(val fromApi: ???, val fromDB: ???)

fun getMovies(): Result {
    val apiRes = getMoviesFromApiAsync()
    val dbRes = getMoviesFromDbAsync()
    return Result(apiRes.await(), dbRes.await())
}

fun getMoviesFromApiAsync() = async {

    return apiInterface.getMovies()
            .doOnNext { it ->
                it.data?.let { it1 -> MoviesDao.insertAllMovies(it1) }
                println("Size of Movies from API %d", it.data?.size)
            }
            .map({ r -> r.data })
}

fun getMoviesFromDbAsync() = async {
    return MoviesDao.queryMovies()           
}

我不知道你要返回什么,所以我只用 ??? 代替。

作为第一步,您应该为 ApiInterfaceMovieDao 调用创建 suspend fun。如果他们有一些基于回调的API,你可以关注these official instructions.

你现在应该

suspend fun ApiInterface.suspendGetMovies(): List<Movie>

suspend fun MoviesDao.suspendQueryMovies(): List<Movie>

现在您可以编写以下代码:

launch(UI) {
    val fromNetwork = async(UI) { apiInterface.suspendGetMovies() }
    val fromDb = async(UI) { MoviesDao.suspendQueryMovies() }
    select<List<Movie>> {
        fromNetwork.onAwait { it }
        fromDb.onAwait { it }
    }.also { movies ->
        // act on the movies
    }
}

亮点是 select 调用,它将同时等待两个 Deferred 并根据第一个完成的调用执行操作。

如果您想确保根据网络的结果采取行动,您将需要更多代码,例如:

    val action = { movies: List<Movie> ->
        // act on the returned movie list
    }
    var gotNetworkResult = false
    select<List<Movie>> {
        fromNetwork.onAwait { gotNetworkResult = true; it }
        fromDb.onAwait { it }
    }.also(action)
    if (!gotNetworkResult) {
        action(fromNetwork.await())
    }

仅当数据库结果在网络结果之前进入时,此代码才会对数据库结果起作用,在所有情况下都会处理网络结果。