Retrofit、Coroutines 和 Suspend 函数的并行请求
Parallel request with Retrofit, Coroutines and Suspend functions
我正在使用 Retrofit 来发出一些网络请求。我还将协程与 'suspend' 函数结合使用。
我的问题是:有没有办法改进下面的代码。这个想法是并行启动多个请求并等待它们全部完成,然后再继续该功能。
lifecycleScope.launch {
try {
itemIds.forEach { itemId ->
withContext(Dispatchers.IO) { itemById[itemId] = MyService.getItem(itemId) }
}
} catch (exception: Exception) {
exception.printStackTrace()
}
Log.i(TAG, "All requests have been executed")
}
(注意 "MyService.getItem()" 是一个 'suspend' 函数。)
我想在这种情况下有比 foreach 更好的东西。
有人有想法吗?
我准备了三种方法来解决这个问题,从最简单的到最正确的。为了简化方法的介绍,我提取了这个公共代码:
lifecycleScope.launch {
val itemById = try {
fetchItems(itemIds)
} catch (exception: Exception) {
exception.printStackTrace()
}
Log.i(TAG, "Fetched these items: $itemById")
}
在我继续之前,一般注意事项:您的 getItem()
函数是可暂停的,您无需将其提交给 IO
调度程序。您所有的协程都可以 运行 在主线程上。
现在让我们看看如何实现 fetchItems(itemIds)
。
1。简单的 forEach
这里利用所有协程代码都可以在主线程运行:
suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> {
val itemById = mutableMapOf<Long, Item>()
coroutineScope {
itemIds.forEach { itemId ->
launch { itemById[itemId] = MyService.getItem(itemId) }
}
}
return itemById
}
coroutineScope
将等待您 launch
在其中的所有协程。尽管它们都 运行 彼此并发,但启动的协程仍会分派到单个(主)线程,因此从它们中的每一个更新地图不存在并发问题。
2。线程安全变体
它利用单线程上下文的属性这一事实可以看作是第一种方法的局限性:它没有推广到基于线程池的上下文。我们可以依靠async-await
机制来避免这个限制:
suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> = coroutineScope {
itemIds.map { itemId -> async { itemId to MyService.getItem(itemId) } }
.map { it.await() }
.toMap()
}
这里我们依赖 Collection.map()
的两个不明显的属性:
- 它急切地执行所有转换,因此在进入第二阶段之前,对
Deferred<Pair<Long, Item>>
集合的第一次转换已经完成,我们等待所有这些。
- 它是一个内联函数,它允许我们在其中编写可挂起的代码,即使该函数本身不是
suspend fun
并且得到一个不可挂起的 lambda (Deferred<T>) -> T
.
这意味着所有的获取都是同时完成的,但是地图是在一个协程中组装的。
3。具有改进的并发控制的基于流的方法
上面为我们解决了并发问题,但是没有背压。如果您的输入列表非常大,您需要限制同时发出的网络请求数。
您可以使用基于 Flow
的成语来做到这一点:
suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> = itemIds
.asFlow()
.flatMapMerge(concurrency = MAX_CONCURRENT_REQUESTS) { itemId ->
flow { emit(itemId to MyService.getItem(itemId)) }
}
.toMap()
这里的魔法在于 .flatMapMerge
操作。你给它一个函数 (T) -> Flow<R>
,它会在所有输入上顺序执行它,然后它会同时收集它得到的所有流。请注意,我无法将 flow { emit(getItem()) } }
简化为 flowOf(getItem())
,因为 getItem()
必须在收集流量时延迟调用。
Flow.toMap()
目前标准库中没有提供,所以这里是:
suspend fun <K, V> Flow<Pair<K, V>>.toMap(): Map<K, V> {
val result = mutableMapOf<K, V>()
collect { (k, v) -> result[k] = v }
return result
}
如果您正在寻找一种更好的编写和消除方法foreach
lifecycleScope.launch {
try {
itemIds.asFlow()
.flowOn(Dispatchers.IO)
.collect{ itemId -> itemById[itemId] = MyService.getItem(itemId)}
} catch (exception: Exception) {
exception.printStackTrace()
}
Log.i(TAG, "All requests have been executed")
}
另外请看lifecycleScope
我怀疑它正在使用Dispatchers.Main
。如果是这种情况,您可以删除此 .flowOn(Dispatchers.IO)
额外的调度程序声明。
我正在使用 Retrofit 来发出一些网络请求。我还将协程与 'suspend' 函数结合使用。
我的问题是:有没有办法改进下面的代码。这个想法是并行启动多个请求并等待它们全部完成,然后再继续该功能。
lifecycleScope.launch {
try {
itemIds.forEach { itemId ->
withContext(Dispatchers.IO) { itemById[itemId] = MyService.getItem(itemId) }
}
} catch (exception: Exception) {
exception.printStackTrace()
}
Log.i(TAG, "All requests have been executed")
}
(注意 "MyService.getItem()" 是一个 'suspend' 函数。)
我想在这种情况下有比 foreach 更好的东西。
有人有想法吗?
我准备了三种方法来解决这个问题,从最简单的到最正确的。为了简化方法的介绍,我提取了这个公共代码:
lifecycleScope.launch {
val itemById = try {
fetchItems(itemIds)
} catch (exception: Exception) {
exception.printStackTrace()
}
Log.i(TAG, "Fetched these items: $itemById")
}
在我继续之前,一般注意事项:您的 getItem()
函数是可暂停的,您无需将其提交给 IO
调度程序。您所有的协程都可以 运行 在主线程上。
现在让我们看看如何实现 fetchItems(itemIds)
。
1。简单的 forEach
这里利用所有协程代码都可以在主线程运行:
suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> {
val itemById = mutableMapOf<Long, Item>()
coroutineScope {
itemIds.forEach { itemId ->
launch { itemById[itemId] = MyService.getItem(itemId) }
}
}
return itemById
}
coroutineScope
将等待您 launch
在其中的所有协程。尽管它们都 运行 彼此并发,但启动的协程仍会分派到单个(主)线程,因此从它们中的每一个更新地图不存在并发问题。
2。线程安全变体
它利用单线程上下文的属性这一事实可以看作是第一种方法的局限性:它没有推广到基于线程池的上下文。我们可以依靠async-await
机制来避免这个限制:
suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> = coroutineScope {
itemIds.map { itemId -> async { itemId to MyService.getItem(itemId) } }
.map { it.await() }
.toMap()
}
这里我们依赖 Collection.map()
的两个不明显的属性:
- 它急切地执行所有转换,因此在进入第二阶段之前,对
Deferred<Pair<Long, Item>>
集合的第一次转换已经完成,我们等待所有这些。 - 它是一个内联函数,它允许我们在其中编写可挂起的代码,即使该函数本身不是
suspend fun
并且得到一个不可挂起的 lambda(Deferred<T>) -> T
.
这意味着所有的获取都是同时完成的,但是地图是在一个协程中组装的。
3。具有改进的并发控制的基于流的方法
上面为我们解决了并发问题,但是没有背压。如果您的输入列表非常大,您需要限制同时发出的网络请求数。
您可以使用基于 Flow
的成语来做到这一点:
suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> = itemIds
.asFlow()
.flatMapMerge(concurrency = MAX_CONCURRENT_REQUESTS) { itemId ->
flow { emit(itemId to MyService.getItem(itemId)) }
}
.toMap()
这里的魔法在于 .flatMapMerge
操作。你给它一个函数 (T) -> Flow<R>
,它会在所有输入上顺序执行它,然后它会同时收集它得到的所有流。请注意,我无法将 flow { emit(getItem()) } }
简化为 flowOf(getItem())
,因为 getItem()
必须在收集流量时延迟调用。
Flow.toMap()
目前标准库中没有提供,所以这里是:
suspend fun <K, V> Flow<Pair<K, V>>.toMap(): Map<K, V> {
val result = mutableMapOf<K, V>()
collect { (k, v) -> result[k] = v }
return result
}
如果您正在寻找一种更好的编写和消除方法foreach
lifecycleScope.launch {
try {
itemIds.asFlow()
.flowOn(Dispatchers.IO)
.collect{ itemId -> itemById[itemId] = MyService.getItem(itemId)}
} catch (exception: Exception) {
exception.printStackTrace()
}
Log.i(TAG, "All requests have been executed")
}
另外请看lifecycleScope
我怀疑它正在使用Dispatchers.Main
。如果是这种情况,您可以删除此 .flowOn(Dispatchers.IO)
额外的调度程序声明。