如何更改我的辅助函数以便收集并行处理任务的结果

How to change my helper function so that is collects the results of the parallel processing tasks

我写了这个辅助函数,这样我就可以轻松地并行处理一个列表,并且只在所有工作完成后才继续执行代码。当您不需要 return 结果时,它工作得很好。

(我知道每次都创建新池不是最佳做法,它可以很容易地移出,但我想让示例保持简单。)

fun recursiveAction(action: () -> Unit): RecursiveAction {
    return object : RecursiveAction() {
        override fun compute() {
            action()
        }
    }
}

fun <T> List<T>.parallelForEach(parallelSize: Int, action: (T) -> Unit) {
    ForkJoinPool(parallelSize).invoke(recursiveAction {
        this.parallelStream().forEach { action(it) }
    })
}

使用示例:

val myList: List<SomeClass> [...]
val parallelSize: Int = 8

myList.parallelForEach(parallelSize) { listElement ->
   //Some task here
}

当您想将结果收集回列表时,是否有任何方法可以制作类似的辅助构造?

我知道我必须使用 RecursiveTask 而不是 RecursiveAction,但我无法像上面那样编写一个辅助函数来包装它。

我想这样使用它:

val myList: List<SomeClass> [...]
val parallelSize: Int = 8

val result: List<SomeClass> = myList.parallelForEach(parallelSize) { listElement ->
   //Some task here
}

或者,是否有更简单的方法来一起完成此操作?

回答者JeffMurdock over on Reddit

fun <T> recursiveTask(action: () -> T): RecursiveTask<T> {
    return object : RecursiveTask<T>() {
        override fun compute(): T {
            return action()
        }
    }
}

fun <T, E> List<T>.parallelForEach(parallelSize: Int, action: (T) -> E): List<E> {
    val pool = ForkJoinPool(parallelSize)
    val result = mutableListOf<ForkJoinTask<E>>()
    for (item in this) {
        result.add(pool.submit(recursiveTask {
            action(item)
        }))
    }
    return result.map { it.join() }
}

fun main(args: Array<String>) {
    val list = listOf(1, 2, 3)
    list.parallelForEach(3) { it + 2 }.forEach { println(it) }
}