Kotlin 将顺序 IO 调用包装为 Sequence
Kotlin wrap sequential IO calls as a Sequence
我需要处理来自分页 API 端点的所有结果。我想按顺序显示所有结果。
我想出了以下(略微伪编码):
suspend fun getAllRowsFromAPI(client: Client): Sequence<Row> {
var currentRequest: Request? = client.requestForNextPage()
return withContext(Dispatchers.IO) {
sequence {
while(currentRequest != null) {
var rowsInPage = runBlocking { client.makeRequest(currentRequest) }
currentRequest = client.requestForNextPage()
yieldAll(rowsInPage)
}
}
}
}
这个功能有效,但我不确定一些事情:
- 在
runBlocking
内部发生的 API 请求是否仍在 IO 调度程序中发生?
- 有没有办法重构代码以在产生当前结果之前启动下一个请求,然后等待它?
问题 1: API-request 仍然会 运行 在 IO-dispatcher 上,但是它会阻塞线程 运行继续。这意味着在等待请求完成时,不能在该线程上安排其他任务。根本没有任何理由在生产代码中使用 runBlocking
,因为:
- 如果
makeRequest
已经是阻塞调用,那么 runBlocking
几乎什么都不做。
- 如果
makeRequest
是暂停调用,那么 runBlocking
会使代码效率降低。在等待请求完成时,它不会将线程返回到池中。
makeRequest
是阻塞调用还是非阻塞调用取决于您使用的客户端。这是我可以推荐的非阻塞 http 客户端:https://ktor.io/clients/
问题 2:为此我会使用 Flow
。您可以将其视为 Sequence
的可挂起变体。流是 cold,这意味着它不会 运行 在消费者询问其内容之前(与 hot 相反,这意味着无论消费者是否需要,生产者都会推出新的价值)。 Kotlin Flow
有一个名为 buffer
的运算符,您可以使用它让它在完全使用前一页之前请求更多页面。
该代码看起来可能与您已有的代码非常相似:
suspend fun getAllRowsFromAPI(client: Client): Flow<Row> = flow {
var currentRequest: Request? = client.requestForNextPage()
while(currentRequest != null) {
val rowsInPage = client.makeRequest(currentRequest)
emitAll(rowsInPage.asFlow())
currentRequest = client.requestForNextPage()
}
}.flowOn(Dispatchers.IO)
.buffer(capacity = 1)
1的容量意味着在处理较早的页面时只会再发出1次请求。您可以增加缓冲区大小以发出更多并发请求。
您应该查看 KotlinConf 2019 的演讲以了解有关流程的更多信息:https://www.youtube.com/watch?v=tYcqn48SMT8
序列绝对不是你想在这种情况下使用的东西,因为它们不是为在异步环境中工作而设计的。也许你应该看看流程和渠道,但对于你的情况,最好和最简单的选择只是延迟值的集合,因为你想一次处理所有请求(流程和渠道一个接一个地处理它们,也许缓冲区大小有限)。
下面的方法可以让你异步启动所有请求(假设makeRequest
是挂起函数,支持异步请求)。当您需要结果时,只需等待最慢的请求完成。
fun getClientRequests(client: Client): List<Request> {
val requests = ArrayList<Request>()
var currentRequest: Request? = client.requestForNextPage()
while (currentRequest != null) {
requests += currentRequest
currentRequest = client.requestForNextPage()
}
return requests
}
// This function is not even suspended, so it finishes almost immediately
fun getAllRowsFromAPI(client: Client): List<Deferred<Page>> =
getClientRequests(client).map {
/*
* The better practice would be making getAllRowsFromApi an extension function
* to CoroutineScope and calling receiver scope's async function.
* GlobalScope is used here just for simplicity.
*/
GlobalScope.async(Dispatchers.IO) { client.makeRequest(it) }
}
fun main() {
val client = Client()
val deferredPages = getAllRowsFromAPI(client) // This line executes fast
// Here you can do whatever you want, all requests are processed in background
Thread.sleep(999L)
// Then, when we need results....
val pages = runBlocking {
deferredPages.map { it.await() }
}
println(pages)
// In your case you also want to "unpack" pages and get rows, you can do it here:
val rows = pages.flatMap { it.getRows() }
println(rows)
}
我碰巧在 Kotlin 的 coroutines-examples
中遇到 suspendingSequence
:
这正是我要找的。
我需要处理来自分页 API 端点的所有结果。我想按顺序显示所有结果。
我想出了以下(略微伪编码):
suspend fun getAllRowsFromAPI(client: Client): Sequence<Row> {
var currentRequest: Request? = client.requestForNextPage()
return withContext(Dispatchers.IO) {
sequence {
while(currentRequest != null) {
var rowsInPage = runBlocking { client.makeRequest(currentRequest) }
currentRequest = client.requestForNextPage()
yieldAll(rowsInPage)
}
}
}
}
这个功能有效,但我不确定一些事情:
- 在
runBlocking
内部发生的 API 请求是否仍在 IO 调度程序中发生? - 有没有办法重构代码以在产生当前结果之前启动下一个请求,然后等待它?
问题 1: API-request 仍然会 运行 在 IO-dispatcher 上,但是它会阻塞线程 运行继续。这意味着在等待请求完成时,不能在该线程上安排其他任务。根本没有任何理由在生产代码中使用 runBlocking
,因为:
- 如果
makeRequest
已经是阻塞调用,那么runBlocking
几乎什么都不做。 - 如果
makeRequest
是暂停调用,那么runBlocking
会使代码效率降低。在等待请求完成时,它不会将线程返回到池中。
makeRequest
是阻塞调用还是非阻塞调用取决于您使用的客户端。这是我可以推荐的非阻塞 http 客户端:https://ktor.io/clients/
问题 2:为此我会使用 Flow
。您可以将其视为 Sequence
的可挂起变体。流是 cold,这意味着它不会 运行 在消费者询问其内容之前(与 hot 相反,这意味着无论消费者是否需要,生产者都会推出新的价值)。 Kotlin Flow
有一个名为 buffer
的运算符,您可以使用它让它在完全使用前一页之前请求更多页面。
该代码看起来可能与您已有的代码非常相似:
suspend fun getAllRowsFromAPI(client: Client): Flow<Row> = flow {
var currentRequest: Request? = client.requestForNextPage()
while(currentRequest != null) {
val rowsInPage = client.makeRequest(currentRequest)
emitAll(rowsInPage.asFlow())
currentRequest = client.requestForNextPage()
}
}.flowOn(Dispatchers.IO)
.buffer(capacity = 1)
1的容量意味着在处理较早的页面时只会再发出1次请求。您可以增加缓冲区大小以发出更多并发请求。 您应该查看 KotlinConf 2019 的演讲以了解有关流程的更多信息:https://www.youtube.com/watch?v=tYcqn48SMT8
序列绝对不是你想在这种情况下使用的东西,因为它们不是为在异步环境中工作而设计的。也许你应该看看流程和渠道,但对于你的情况,最好和最简单的选择只是延迟值的集合,因为你想一次处理所有请求(流程和渠道一个接一个地处理它们,也许缓冲区大小有限)。
下面的方法可以让你异步启动所有请求(假设makeRequest
是挂起函数,支持异步请求)。当您需要结果时,只需等待最慢的请求完成。
fun getClientRequests(client: Client): List<Request> {
val requests = ArrayList<Request>()
var currentRequest: Request? = client.requestForNextPage()
while (currentRequest != null) {
requests += currentRequest
currentRequest = client.requestForNextPage()
}
return requests
}
// This function is not even suspended, so it finishes almost immediately
fun getAllRowsFromAPI(client: Client): List<Deferred<Page>> =
getClientRequests(client).map {
/*
* The better practice would be making getAllRowsFromApi an extension function
* to CoroutineScope and calling receiver scope's async function.
* GlobalScope is used here just for simplicity.
*/
GlobalScope.async(Dispatchers.IO) { client.makeRequest(it) }
}
fun main() {
val client = Client()
val deferredPages = getAllRowsFromAPI(client) // This line executes fast
// Here you can do whatever you want, all requests are processed in background
Thread.sleep(999L)
// Then, when we need results....
val pages = runBlocking {
deferredPages.map { it.await() }
}
println(pages)
// In your case you also want to "unpack" pages and get rows, you can do it here:
val rows = pages.flatMap { it.getRows() }
println(rows)
}
我碰巧在 Kotlin 的 coroutines-examples
中遇到 suspendingSequence
:
这正是我要找的。