如何在 Android Worker 中完成 Kotlin Flow

How to complete a Kotlin Flow in Android Worker

我正在研究在我当前的 Android 应用程序中使用 Kotlin Flow

我的应用程序通过 Retrofit API 调用从远程服务器检索数据。

其中一些 API 的 return 50,000 个数据项位于 500 个项目页面中。

每个 API 响应包含一个 HTTP Link header 包含下一页完成 URL。

这些调用最多可能需要 2 秒才能完成。

为了减少运行时间,我使用了 Kotlin Flow 来同时处理每个页面 数据,同时还进行下一页 API 调用。

我的流程定义如下:

private val persistenceThreadPool = Executors.newFixedThreadPool(3).asCoroutineDispatcher()
private val internalWorkWorkState = MutableStateFlow<Response<List<MyPage>>?>(null)
private val workWorkState = internalWorkWorkState.asStateFlow()

private val myJob: Job

init {
    myJob = GlobalScope.launch(persistenceThreadPool) {
        workWorkState.collect { page ->
            if (page == null) {
            } else managePage(page!!)
        }
    }
}

我的递归函数定义如下,获取所有页面:-

    private suspend fun managePages(accessToken: String, response: Response<List<MyPage>>) {
        when {
            result != null -> return
            response.isSuccessful -> internalWorkWorkState.emit(response)
            else -> {
                manageError(response.errorBody())
                result = Result.failure()
                return
            }
        }

        response.headers().filter { it.first == HTTP_HEADER_LINK && it.second.contains(REL_NEXT) }.forEach {
            val parts = it.second.split(OPEN_ANGLE, CLOSE_ANGLE)
            if (parts.size >= 2) {
                managePages(accessToken, service.myApiCall(accessToken, parts[1]))
            }
        }
    }

   private suspend fun managePage(response: Response<List<MyPage>>) {
        val pages = response.body()

        pages?.let {
            persistResponse(it)
        }
    }

    private suspend fun persistResponse(myPage: List<MyPage>) {
        val myPageDOs = ArrayList<MyPageDO>()

        myPage.forEach { page ->
            myPageDOs.add(page.mapDO())
        }

        database.myPageDAO().insertAsync(myPageDOs)
    }
    

我的许多问题是

  1. 这段代码没有插入我检索到的所有数据项

  2. 检索完所有数据项后如何完成流程

  3. 检索并保存所有数据项后如何完成 GlobalScope 作业

更新

通过进行以下更改,我成功地插入了所有数据

 private val persistenceThreadPool = Executors.newFixedThreadPool(3).asCoroutineDispatcher()
    private val completed = CompletableDeferred<Int>()

    private val channel = Channel<Response<List<MyPage>>?>(UNLIMITED)
    private val channelFlow = channel.consumeAsFlow().flowOn(persistenceThreadPool)

    private val frank: Job

    init {
        frank = GlobalScope.launch(persistenceThreadPool) {
            channelFlow.collect { page ->
                if (page == null) {
                    completed.complete(totalItems)
                } else managePage(page!!)
            }
        }
    }


...
...
...

   channel.send(null)
   completed.await()

   return result ?: Result.success(outputData)

我不喜欢依赖 CompletableDeferred,有没有比这更好的方法来知道 Flow 何时完成所有事情?

有几种方法可以实现所需的行为。我建议使用 coroutineScope which is designed specifically for parallel decomposition. It also provides good cancellation and error handling behaviour out of the box. In conjunction with Channel.close behaviour 它使实现非常简单。从概念上讲,实现可能如下所示:

 suspend fun fetchAllPages() {
    coroutineScope {
        val channel = Channel<MyPage>(Channel.UNLIMITED)
        launch(Dispatchers.IO){ loadData(channel) }
        launch(Dispatchers.IO){ processData(channel) }
    }
}

suspend fun loadData(sendChannel: SendChannel<MyPage>){
    while(hasMoreData()){
        sendChannel.send(loadPage())
    }
    sendChannel.close()
}

suspend fun processData(channel: ReceiveChannel<MyPage>){
    for(page in channel){
        // process page
    }
}

它的工作方式如下:

  1. coroutineScope 暂停,直到所有 children 完成。所以你不再需要 CompletableDeferred
  2. loadData() 循环加载页面并将它们发布到频道中。它会在所有页面加载完毕后立即关闭频道。
  3. processData从频道中一个接一个地抓取项目并进行处理。一旦处理完所有项目(并且通道已关闭),循环将立即结束。

在此实现中,生产者协程独立工作,没有 back-pressure,因此如果处理速度慢,可能会占用大量内存。限制缓冲区容量,以便在缓冲区已满时暂停生产者协程。 使用 channels fan-out 行为来启动多个处理器以加速计算可能也是一个好主意。

您正在寻找 flow builder and Flow.buffer():

suspend fun getData(): Flow<Data> = flow {
    var pageData: List<Data>
    var pageUrl: String? = "bla"
    while (pageUrl != null) {
        TODO("fetch pageData from pageUrl and change pageUrl to the next page")
        emitAll(pageData)
    }
}
    .flowOn(Dispatchers.IO /* no need for a thread pool executor, IO does it automatically */)
    .buffer(3)

您可以像普通流程一样使用它,迭代等。如果您想知道输出的总长度,您应该使用可变闭包变量在消费者上计算它。请注意,您不需要在任何地方(理想情况下永远)使用 GlobalScope。