Kotlin 流组合
Kotlin flow combination
当有一系列并行请求需要收敛到相同的结果时,我不确定如何使用 Kotlin 流程。
作为示例,让我们使用我们有两个端点的场景,一个返回用户 ID 列表,一个返回用户是否超过 30 岁。
fun callEndpoint1(): Flow<List<String>> {
// Actual call to server returning a list of N userIDs
// For simulation purposes it returns the following
return flowOf(listOf("1", "2", "3", "4", "5"))
}
fun callEndpoint2(userId: String): Flow<Boolean> {
// Actual call to server returning a boolean
// For simulation purposes it returns the following
return flowOf(userId.toInt() % 2 == 0)
}
fun calculateTotalPeopleOver30(): Flow<Int> {
return callEndpoint1().map{ listIds ->
// wrong approach returning 0, should return 2
var result = 0
listIds.forEach{ id ->
callEndpoint2(id).map { isOver30 ->
if (isOver30) result++
}
}
result
}
}
这种方法是错误的,因为变量 result
将在有机会存储来自不同并行调用的所有结果之前返回。
如果端点无法同时处理大量 ID,那么解决该问题的正确方法是什么?
我找到了让它工作的方法,但这只是通过另一个 class 泄漏我们需要实现的知识,这不是我想要的。
仅出于说明目的,我将其包含在此处
fun calculateTotalPeopleOver30(): Flow<List<Boolean>> {
return callEndpoint1().map { listIds ->
val result = arrayListOf<Boolean>()
listIds.forEach { id ->
result.add(callEndpoint2(id).first())
}
result.toList()
}
}
fun coroutineLauncher(scope: CoroutineScope) {
scope.launch{
calculateTotalPeopleOver30()
.collect { list ->
println("people over 30 are ${list.filter{it}.size}")
}
}
}
您的示例不起作用,因为 Flow
在您收集它之前不会执行任何操作。这是一个懒惰的构造。这意味着这段代码实际上没有做任何事情:
listIds.forEach{ id ->
callEndpoint2(id).map { isOver30 ->
if (isOver30) result++
}
}
为了解决这个问题,你需要收集callEndpoint2(id)
返回的Flow
。您可以使用 count
方法来执行此操作,而不是手动计数。
val result = listIds.map { id ->
callEndpoint2(id)
.count { it }
}.sum()
现在,还有另一个问题。计数不会并行进行。在继续下一个 callEndpoint2(id)
之前,它将完成计算一个 callEndpoint2(id)
的结果。
为此,您可以将 ID 列表转换为 Flow
并使用 flatMapMerge
并发调用 callEndpoint2
。 flatMapMerge
的默认并发数为 16,但您可以使用 concurrency
参数配置它。结果代码是这样的:
fun calculateTotal(): Flow<Int> {
return callEndpoint1()
.map {
it.asFlow()
.flatMapMerge { callEndpoint2(it) }
.count { it }
}
}
当有一系列并行请求需要收敛到相同的结果时,我不确定如何使用 Kotlin 流程。 作为示例,让我们使用我们有两个端点的场景,一个返回用户 ID 列表,一个返回用户是否超过 30 岁。
fun callEndpoint1(): Flow<List<String>> {
// Actual call to server returning a list of N userIDs
// For simulation purposes it returns the following
return flowOf(listOf("1", "2", "3", "4", "5"))
}
fun callEndpoint2(userId: String): Flow<Boolean> {
// Actual call to server returning a boolean
// For simulation purposes it returns the following
return flowOf(userId.toInt() % 2 == 0)
}
fun calculateTotalPeopleOver30(): Flow<Int> {
return callEndpoint1().map{ listIds ->
// wrong approach returning 0, should return 2
var result = 0
listIds.forEach{ id ->
callEndpoint2(id).map { isOver30 ->
if (isOver30) result++
}
}
result
}
}
这种方法是错误的,因为变量 result
将在有机会存储来自不同并行调用的所有结果之前返回。
如果端点无法同时处理大量 ID,那么解决该问题的正确方法是什么?
我找到了让它工作的方法,但这只是通过另一个 class 泄漏我们需要实现的知识,这不是我想要的。 仅出于说明目的,我将其包含在此处
fun calculateTotalPeopleOver30(): Flow<List<Boolean>> {
return callEndpoint1().map { listIds ->
val result = arrayListOf<Boolean>()
listIds.forEach { id ->
result.add(callEndpoint2(id).first())
}
result.toList()
}
}
fun coroutineLauncher(scope: CoroutineScope) {
scope.launch{
calculateTotalPeopleOver30()
.collect { list ->
println("people over 30 are ${list.filter{it}.size}")
}
}
}
您的示例不起作用,因为 Flow
在您收集它之前不会执行任何操作。这是一个懒惰的构造。这意味着这段代码实际上没有做任何事情:
listIds.forEach{ id ->
callEndpoint2(id).map { isOver30 ->
if (isOver30) result++
}
}
为了解决这个问题,你需要收集callEndpoint2(id)
返回的Flow
。您可以使用 count
方法来执行此操作,而不是手动计数。
val result = listIds.map { id ->
callEndpoint2(id)
.count { it }
}.sum()
现在,还有另一个问题。计数不会并行进行。在继续下一个 callEndpoint2(id)
之前,它将完成计算一个 callEndpoint2(id)
的结果。
为此,您可以将 ID 列表转换为 Flow
并使用 flatMapMerge
并发调用 callEndpoint2
。 flatMapMerge
的默认并发数为 16,但您可以使用 concurrency
参数配置它。结果代码是这样的:
fun calculateTotal(): Flow<Int> {
return callEndpoint1()
.map {
it.asFlow()
.flatMapMerge { callEndpoint2(it) }
.count { it }
}
}