如何更改我的辅助函数以便收集并行处理任务的结果
How to change my helper function so that is collects the results of the parallel processing tasks
我写了这个辅助函数,这样我就可以轻松地并行处理一个列表,并且只在所有工作完成后才继续执行代码。当您不需要 return 结果时,它工作得很好。
(我知道每次都创建新池不是最佳做法,它可以很容易地移出,但我想让示例保持简单。)
fun recursiveAction(action: () -> Unit): RecursiveAction {
return object : RecursiveAction() {
override fun compute() {
action()
}
}
}
fun <T> List<T>.parallelForEach(parallelSize: Int, action: (T) -> Unit) {
ForkJoinPool(parallelSize).invoke(recursiveAction {
this.parallelStream().forEach { action(it) }
})
}
使用示例:
val myList: List<SomeClass> [...]
val parallelSize: Int = 8
myList.parallelForEach(parallelSize) { listElement ->
//Some task here
}
当您想将结果收集回列表时,是否有任何方法可以制作类似的辅助构造?
我知道我必须使用 RecursiveTask 而不是 RecursiveAction,但我无法像上面那样编写一个辅助函数来包装它。
我想这样使用它:
val myList: List<SomeClass> [...]
val parallelSize: Int = 8
val result: List<SomeClass> = myList.parallelForEach(parallelSize) { listElement ->
//Some task here
}
或者,是否有更简单的方法来一起完成此操作?
回答者JeffMurdock over on Reddit
fun <T> recursiveTask(action: () -> T): RecursiveTask<T> {
return object : RecursiveTask<T>() {
override fun compute(): T {
return action()
}
}
}
fun <T, E> List<T>.parallelForEach(parallelSize: Int, action: (T) -> E): List<E> {
val pool = ForkJoinPool(parallelSize)
val result = mutableListOf<ForkJoinTask<E>>()
for (item in this) {
result.add(pool.submit(recursiveTask {
action(item)
}))
}
return result.map { it.join() }
}
fun main(args: Array<String>) {
val list = listOf(1, 2, 3)
list.parallelForEach(3) { it + 2 }.forEach { println(it) }
}
我写了这个辅助函数,这样我就可以轻松地并行处理一个列表,并且只在所有工作完成后才继续执行代码。当您不需要 return 结果时,它工作得很好。
(我知道每次都创建新池不是最佳做法,它可以很容易地移出,但我想让示例保持简单。)
fun recursiveAction(action: () -> Unit): RecursiveAction {
return object : RecursiveAction() {
override fun compute() {
action()
}
}
}
fun <T> List<T>.parallelForEach(parallelSize: Int, action: (T) -> Unit) {
ForkJoinPool(parallelSize).invoke(recursiveAction {
this.parallelStream().forEach { action(it) }
})
}
使用示例:
val myList: List<SomeClass> [...]
val parallelSize: Int = 8
myList.parallelForEach(parallelSize) { listElement ->
//Some task here
}
当您想将结果收集回列表时,是否有任何方法可以制作类似的辅助构造?
我知道我必须使用 RecursiveTask 而不是 RecursiveAction,但我无法像上面那样编写一个辅助函数来包装它。
我想这样使用它:
val myList: List<SomeClass> [...]
val parallelSize: Int = 8
val result: List<SomeClass> = myList.parallelForEach(parallelSize) { listElement ->
//Some task here
}
或者,是否有更简单的方法来一起完成此操作?
回答者JeffMurdock over on Reddit
fun <T> recursiveTask(action: () -> T): RecursiveTask<T> {
return object : RecursiveTask<T>() {
override fun compute(): T {
return action()
}
}
}
fun <T, E> List<T>.parallelForEach(parallelSize: Int, action: (T) -> E): List<E> {
val pool = ForkJoinPool(parallelSize)
val result = mutableListOf<ForkJoinTask<E>>()
for (item in this) {
result.add(pool.submit(recursiveTask {
action(item)
}))
}
return result.map { it.join() }
}
fun main(args: Array<String>) {
val list = listOf(1, 2, 3)
list.parallelForEach(3) { it + 2 }.forEach { println(it) }
}