如何在 Kotlin 中做并行 flatMap?

How to do parallel flatMap in Kotlin?

我需要做平行平面图。 假设我有这段代码:

val coll: List<Set<Int>> = ...
coll.flatMap{set -> setOf(set, set + 1)}

我需要这样的东西:

coll.pFlatMap{set -> setOf(set, set + 1)} // parallel execution

Kotlin 不提供任何开箱即用的线程。 但是你可以使用 kotlinx.coroutines 来做这样的事情:

val coll: List<Set<Int>> = ...
val result = coll
 .map {set -> 
    // Run each task in own coroutine,
    // you can limit concurrency using custom coroutine dispatcher
    async { doSomethingWithSet(set) } 
 }
 .flatMap { deferred -> 
    // Await results and use flatMap
    deferred.await() // You can handle errors here
 }

或者,您也可以不使用协程:

fun <T, R> Collection<T>.pFlatMap(transform: (T) -> Collection<R>): List<R> =
    parallelStream().flatMap { transform(it).stream() }.toList()

此解决方案需要 JDK 8 或更高版本的 Kotlin。

你也可以让它更通用(类似于 Kotlin 的 flatMap):

fun <T, R> Iterable<T>.pFlatMap(transform: (T) -> Iterable<R>): List<R> =
    toList()
        .parallelStream()
        .flatMap { transform(it).toList().stream() }
        .toList()

您需要先添加协程范围(runBlocking),然后将延迟执行应用到您的函数(异步):

val coll: List<Set<Int>> = listOf()
val x = runBlocking {
    coll.map{ set ->
        async {
            setOf(set, set + 1)
        }
    }.awaitAll().flatten()
}

x ....