是否有正确的方法来链接协程流?

Is there a proper way to chain coroutines Flow?

我正在寻找一种方法来链接多个流程,其方式与在 RxJava 中链接操作的方式类似。这是当前实现的代码:

            driverPreferencesFlow
                    .stateIn(lifecycleScope)
                    .transform<DriverPreferences, Response<DriverCredentials>> { it ->
                        Log.d(App.TAG, "Got driver from cache, ask for driver from server")
                        repo.getDriver(it.driver.cell, it.driver.secret)
                    }
                    .onStart {
                    }
                    .onCompletion { e ->
                    }
                    .catch { e ->
                        Log.e(App.TAG, "Something went wrong on loading with driver", e)
                        Response.Error.Exception(e)
                    }
                    .collect { it ->
                        Log.d(App.TAG, "Got driver from server")
                        Log.d(App.TAG, "Collect new driver state ${it.javaClass}")
                        _newDriverState.value = it
                    }

在此实现中,调用了第二个 operation/call (repo.getDriver()),但从未完成。似乎暂停了。

你实现类似任务的方法是什么?

我不确定你为什么使用 transform 而不是 mapflatMap,但根据 documentation 你必须 emit转换函数中的值,所以我假设它看起来像:

.transform<DriverPreferences, Response<DriverCredentials>> { it ->
    Log.d(App.TAG, "Got driver from cache, ask for driver from server")
    emit(repo.getDriver(it.driver.cell, it.driver.secret))
}

当你想从一个Flow中获取emit,然后基于第一个的元素创建另一个时,你需要使用flatMap操作(flatMapConcat/flatMapMerge), 与 RxJava 中的方式相同。这是一个包含两个整数流的简单示例,您可以在其中看到 flatMapmap 运算符的用法:

import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flatMapMerge
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

val a = flowOf(1, 2, 3)
val b = flowOf(3, 4, 5)

val result = a.flatMapConcat { intFromA ->
  b.map { intFromB ->
    intFromA + intFromB
  }
}

runBlocking {
  launch {
    result.collect {
      print(it)
    }
  }
}

是的,正如@romstn 正确指出的那样,transform 运算符需要手动调用 emit 函数,类似于 RxJava 中的 Observable.create()https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/transform.html