Kotlin 将顺序 IO 调用包装为 Sequence

Kotlin wrap sequential IO calls as a Sequence

我需要处理来自分页 API 端点的所有结果。我想按顺序显示所有结果。

我想出了以下(略微伪编码):

suspend fun getAllRowsFromAPI(client: Client): Sequence<Row> {
    var currentRequest: Request? = client.requestForNextPage()
    return withContext(Dispatchers.IO) {
         sequence {
            while(currentRequest != null) {
                var rowsInPage = runBlocking { client.makeRequest(currentRequest) }
                currentRequest = client.requestForNextPage()
                yieldAll(rowsInPage)
            }
        }
     }
}

这个功能有效,但我不确定一些事情:

  1. runBlocking 内部发生的 API 请求是否仍在 IO 调度程序中发生?
  2. 有没有办法重构代码以在产生当前结果之前启动下一个请求,然后等待它?

问题 1: API-request 仍然会 运行 在 IO-dispatcher 上,但是它会阻塞线程 运行继续。这意味着在等待请求完成时,不能在该线程上安排其他任务。根本没有任何理由在生产代码中使用 runBlocking,因为:

  1. 如果 makeRequest 已经是阻塞调用,那么 runBlocking 几乎什么都不做。
  2. 如果 makeRequest 是暂停调用,那么 runBlocking 会使代码效率降低。在等待请求完成时,它不会将线程返回到池中。

makeRequest 是阻塞调用还是非阻塞调用取决于您使用的客户端。这是我可以推荐的非阻塞 http 客户端:https://ktor.io/clients/

问题 2:为此我会使用 Flow。您可以将其视为 Sequence 的可挂起变体。流是 cold,这意味着它不会 运行 在消费者询问其内容之前(与 hot 相反,这意味着无论消费者是否需要,生产者都会推出新的价值)。 Kotlin Flow 有一个名为 buffer 的运算符,您可以使用它让它在完全使用前一页之前请求更多页面。

该代码看起来可能与您已有的代码非常相似:

suspend fun getAllRowsFromAPI(client: Client): Flow<Row> = flow {
    var currentRequest: Request? = client.requestForNextPage()

    while(currentRequest != null) {
        val rowsInPage = client.makeRequest(currentRequest)
        emitAll(rowsInPage.asFlow())
        currentRequest = client.requestForNextPage()
    }
}.flowOn(Dispatchers.IO)
.buffer(capacity = 1)

1的容量意味着在处理较早的页面时只会再发出1次请求。您可以增加缓冲区大小以发出更多并发请求。 您应该查看 KotlinConf 2019 的演讲以了解有关流程的更多信息:https://www.youtube.com/watch?v=tYcqn48SMT8

序列绝对不是你想在这种情况下使用的东西,因为它们不是为在异步环境中工作而设计的。也许你应该看看流程和渠道,但对于你的情况,最好和最简单的选择只是延迟值的集合,因为你想一次处理所有请求(流程和渠道一个接一个地处理它们,也许缓冲区大小有限)。

下面的方法可以让你异步启动所有请求(假设makeRequest是挂起函数,支持异步请求)。当您需要结果时,只需等待最慢的请求完成。

fun getClientRequests(client: Client): List<Request> {
    val requests = ArrayList<Request>()
    var currentRequest: Request? = client.requestForNextPage()
    while (currentRequest != null) {
        requests += currentRequest
        currentRequest = client.requestForNextPage()
    }
    return requests
}

// This function is not even suspended, so it finishes almost immediately
fun getAllRowsFromAPI(client: Client): List<Deferred<Page>> =
    getClientRequests(client).map {
        /* 
         * The better practice would be making getAllRowsFromApi an extension function
         * to CoroutineScope and calling receiver scope's async function.
         * GlobalScope is used here just for simplicity.
         */
        GlobalScope.async(Dispatchers.IO) { client.makeRequest(it) }
    }

fun main() {
    val client = Client()
    val deferredPages = getAllRowsFromAPI(client) // This line executes fast
    // Here you can do whatever you want, all requests are processed in background
    Thread.sleep(999L)
    // Then, when we need results....
    val pages = runBlocking {
        deferredPages.map { it.await() }
    }
    println(pages)
    // In your case you also want to "unpack" pages and get rows, you can do it here:
    val rows = pages.flatMap { it.getRows() }
    println(rows)
}

我碰巧在 Kotlin 的 coroutines-examples 中遇到 suspendingSequence:

https://github.com/Kotlin/coroutines-examples/blob/090469080a974b962f5debfab901954a58a6e46a/examples/suspendingSequence/suspendingSequence.kt

这正是我要找的。