如何在 Kotlin 中做并行 flatMap?
How to do parallel flatMap in Kotlin?
我需要做平行平面图。
假设我有这段代码:
val coll: List<Set<Int>> = ...
coll.flatMap{set -> setOf(set, set + 1)}
我需要这样的东西:
coll.pFlatMap{set -> setOf(set, set + 1)} // parallel execution
Kotlin 不提供任何开箱即用的线程。
但是你可以使用 kotlinx.coroutines 来做这样的事情:
val coll: List<Set<Int>> = ...
val result = coll
.map {set ->
// Run each task in own coroutine,
// you can limit concurrency using custom coroutine dispatcher
async { doSomethingWithSet(set) }
}
.flatMap { deferred ->
// Await results and use flatMap
deferred.await() // You can handle errors here
}
或者,您也可以不使用协程:
fun <T, R> Collection<T>.pFlatMap(transform: (T) -> Collection<R>): List<R> =
parallelStream().flatMap { transform(it).stream() }.toList()
此解决方案需要 JDK 8 或更高版本的 Kotlin。
你也可以让它更通用(类似于 Kotlin 的 flatMap
):
fun <T, R> Iterable<T>.pFlatMap(transform: (T) -> Iterable<R>): List<R> =
toList()
.parallelStream()
.flatMap { transform(it).toList().stream() }
.toList()
您需要先添加协程范围(runBlocking),然后将延迟执行应用到您的函数(异步):
val coll: List<Set<Int>> = listOf()
val x = runBlocking {
coll.map{ set ->
async {
setOf(set, set + 1)
}
}.awaitAll().flatten()
}
x ....
我需要做平行平面图。 假设我有这段代码:
val coll: List<Set<Int>> = ...
coll.flatMap{set -> setOf(set, set + 1)}
我需要这样的东西:
coll.pFlatMap{set -> setOf(set, set + 1)} // parallel execution
Kotlin 不提供任何开箱即用的线程。 但是你可以使用 kotlinx.coroutines 来做这样的事情:
val coll: List<Set<Int>> = ...
val result = coll
.map {set ->
// Run each task in own coroutine,
// you can limit concurrency using custom coroutine dispatcher
async { doSomethingWithSet(set) }
}
.flatMap { deferred ->
// Await results and use flatMap
deferred.await() // You can handle errors here
}
或者,您也可以不使用协程:
fun <T, R> Collection<T>.pFlatMap(transform: (T) -> Collection<R>): List<R> =
parallelStream().flatMap { transform(it).stream() }.toList()
此解决方案需要 JDK 8 或更高版本的 Kotlin。
你也可以让它更通用(类似于 Kotlin 的 flatMap
):
fun <T, R> Iterable<T>.pFlatMap(transform: (T) -> Iterable<R>): List<R> =
toList()
.parallelStream()
.flatMap { transform(it).toList().stream() }
.toList()
您需要先添加协程范围(runBlocking),然后将延迟执行应用到您的函数(异步):
val coll: List<Set<Int>> = listOf()
val x = runBlocking {
coll.map{ set ->
async {
setOf(set, set + 1)
}
}.awaitAll().flatten()
}
x ....