Kotlin 流组合

Kotlin flow combination

当有一系列并行请求需要收敛到相同的结果时,我不确定如何使用 Kotlin 流程。 作为示例,让我们使用我们有两个端点的场景,一个返回用户 ID 列表,一个返回用户是否超过 30 岁。

fun callEndpoint1(): Flow<List<String>> { 
    // Actual call to server returning a list of N userIDs
    // For simulation purposes it returns the following
    return flowOf(listOf("1", "2", "3", "4", "5"))
}

fun callEndpoint2(userId: String): Flow<Boolean> { 
    // Actual call to server returning a boolean
    // For simulation purposes it returns the following
    return flowOf(userId.toInt() % 2 == 0)
}

fun calculateTotalPeopleOver30(): Flow<Int> {
    return callEndpoint1().map{ listIds ->
               // wrong approach returning 0, should return 2
               var result = 0
               listIds.forEach{ id ->
                   callEndpoint2(id).map { isOver30 ->
                       if (isOver30) result++
                   }
               }
               result
           }
}

这种方法是错误的,因为变量 result 将在有机会存储来自不同并行调用的所有结果之前返回。 如果端点无法同时处理大量 ID,那么解决该问题的正确方法是什么?

我找到了让它工作的方法,但这只是通过另一个 class 泄漏我们需要实现的知识,这不是我想要的。 仅出于说明目的,我将其包含在此处

fun calculateTotalPeopleOver30(): Flow<List<Boolean>> {
    return callEndpoint1().map { listIds ->
        val result = arrayListOf<Boolean>()
        listIds.forEach { id ->
            result.add(callEndpoint2(id).first())
        }
        result.toList()
    }
}

fun coroutineLauncher(scope: CoroutineScope) {
    scope.launch{
        calculateTotalPeopleOver30()
            .collect { list ->
                println("people over 30 are ${list.filter{it}.size}")
            }
    }
}

您的示例不起作用,因为 Flow 在您收集它之前不会执行任何操作。这是一个懒惰的构造。这意味着这段代码实际上没有做任何事情:

listIds.forEach{ id ->
    callEndpoint2(id).map { isOver30 ->
        if (isOver30) result++
    }
}

为了解决这个问题,你需要收集callEndpoint2(id)返回的Flow。您可以使用 count 方法来执行此操作,而不是手动计数。

val result = listIds.map { id ->
    callEndpoint2(id)
        .count { it }
}.sum()

现在,还有另一个问题。计数不会并行进行。在继续下一个 callEndpoint2(id) 之前,它将完成计算一个 callEndpoint2(id) 的结果。

为此,您可以将 ID 列表转换为 Flow 并使用 flatMapMerge 并发调用 callEndpoint2flatMapMerge 的默认并发数为 16,但您可以使用 concurrency 参数配置它。结果代码是这样的:

fun calculateTotal(): Flow<Int> {
    return callEndpoint1()
        .map { 
            it.asFlow()
              .flatMapMerge { callEndpoint2(it) }
              .count { it }
        }
}