为什么并行范围处理比基于未来的并行处理(N-queens 示例)花费更多的时间?
Why parallel range processing takes lot more time than Future based parallel processing (N-queens example)?
我想出了两个并行的解决方案,以尽可能快地找到 N 皇后问题的一个解决方案。
第一个使用 Futures
import scala.collection.immutable.HashSet
import scala.concurrent.{Await, Future, Promise}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
/**
* Created by mikel on 17/06/16.
*/
object Queens2 extends App {
val time = System.currentTimeMillis()
val boardSize = 200
def firstResult(): Future[List[Int]] = {
def iterate(solution: Vector[(Int, Int)], remainingElements: Set[Int], invalidSum: HashSet[Int], invalidMinus: HashSet[Int]): Stream[List[(Int, Int)]] = {
def isSafe(queens: Vector[(Int, Int)], queen: Int): Boolean = {
!invalidSum.contains(queens.size + queen) && !invalidMinus.contains(queens.size - queen)
}
if (solution.size == boardSize)
Stream(solution.toList)
else {
for {
nextQueen <- remainingElements.toStream if isSafe(solution, nextQueen)
res <- iterate(solution :+(solution.size, nextQueen), remainingElements - nextQueen, invalidSum + (solution.size + nextQueen), invalidMinus + (solution.size - nextQueen))
} yield (res)
}
}
val promise = Promise[List[Int]]()
val allElements = (0 until boardSize).toSet
val range = (0 until boardSize)
range.foreach(pos => {
// HERE we parallelize the execution
Future {
promise.trySuccess(iterate(Vector((0, pos)), allElements - pos, HashSet(pos), HashSet(-pos)).map(_.map(_._2)).head)
}
}
)
promise.future
}
val resFuture = firstResult()
resFuture.onSuccess { case res =>
println("Finished in: " + (System.currentTimeMillis() - time))
println(res)
System.exit(0)
}
Await.result(Promise().future, Duration.Inf)
}
另一个使用 ParRange
import scala.collection.immutable.HashSet
import scala.concurrent.{Await, Future, Promise}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
/**
* Created by mikel on 17/06/16.
*/
object Queens extends App {
val time = System.currentTimeMillis()
val boardSize = 200
def firstResult(): Future[List[Int]] = {
def iterate(solution: Vector[(Int, Int)], remainingElements: Set[Int], invalidSum: HashSet[Int], invalidMinus: HashSet[Int]): Stream[List[(Int, Int)]] = {
def isSafe(queens: Vector[(Int, Int)], queen: Int): Boolean = {
!invalidSum.contains(queens.size + queen) && !invalidMinus.contains(queens.size - queen)
}
if (solution.size == boardSize)
Stream(solution.toList)
else {
for {
nextQueen <- remainingElements.toStream if isSafe(solution, nextQueen)
res <- iterate(solution :+(solution.size, nextQueen), remainingElements - nextQueen, invalidSum + (solution.size + nextQueen), invalidMinus + (solution.size - nextQueen))
} yield (res)
}
}
val promise = Promise[List[Int]]()
Future {
val allElements = (0 until boardSize).toSet
// HERE we parallelize the execution
val range = (0 until boardSize).par
range.foreach(pos => {
promise.trySuccess(iterate(Vector((0, pos)), allElements - pos, HashSet(pos), HashSet(-pos)).map(_.map(_._2)).head)
}
)
}
promise.future
}
val resFuture = firstResult()
resFuture.onSuccess { case res =>
println("Finished in: " + (System.currentTimeMillis() - time))
println(res)
System.exit(0)
}
Await.result(Promise().future, Duration.Inf)
}
在使用 200 尺寸的电路板执行这两个程序后,我使用第一种方法获得了更快的解决方案(显然在一段时间后,第二种解决方案的并行化水平下降了),有人知道为什么会这样吗?
因此,您的第二个代码段的这个固定版本运行速度足够快:
import java.util.concurrent.Executors
import scala.collection.immutable.HashSet
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
/**
* Created by mikel on 17/06/16.
*/
object Queens extends App {
val time = System.currentTimeMillis()
val boardSize = 200
def firstResult(): Future[List[Int]] = {
def iterate(solution: Vector[(Int, Int)], remainingElements: Set[Int], invalidSum: HashSet[Int], invalidMinus: HashSet[Int]): Stream[List[(Int, Int)]] = {
def isSafe(queens: Vector[(Int, Int)], queen: Int): Boolean = {
!invalidSum.contains(queens.size + queen) && !invalidMinus.contains(queens.size - queen)
}
if (solution.size == boardSize)
Stream(solution.toList)
else {
for {
nextQueen <- remainingElements.toStream if isSafe(solution, nextQueen)
res <- iterate(solution :+(solution.size, nextQueen), remainingElements - nextQueen, invalidSum + (solution.size + nextQueen), invalidMinus + (solution.size - nextQueen))
} yield (res)
}
}
val futureExecutor = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(20))
val promise = Promise[List[Int]]()
Future({
val allElements = (0 until boardSize).toSet
// HERE we parallelize the execution
val range = (0 until boardSize).par
range.foreach(pos => {
promise.trySuccess(iterate(Vector((0, pos)), allElements - pos, HashSet(pos), HashSet(-pos)).map(_.map(_._2)).head)
}
)
})(futureExecutor)
promise.future
}
val resFuture = firstResult()
resFuture.onSuccess({ case res =>
println("Finished in: " + (System.currentTimeMillis() - time))
println(res)
System.exit(0)
})(scala.concurrent.ExecutionContext.Implicits.global)
Await.result(Promise().future, Duration.Inf)
}
如您所见,我引入了单独的执行程序来等待结果和计算您的未来。为了使它更明显,我将它们显式化了,但当然你可以使用隐式。
你的第二个代码片段中的问题是线程池在默认执行上下文中耗尽 (scala.concurrent.ExecutionContext.Implicits.global
),因此你的承诺将在几乎所有计算完成之前不会触发。
ParRange
默认使用全局上下文 TaskSupport
:
//...
private[parallel] def getTaskSupport: TaskSupport = new ExecutionContextTaskSupport
//...
class ExecutionContextTaskSupport(val environment: ExecutionContext = scala.concurrent.ExecutionContext.global)
extends TaskSupport with ExecutionContextTasks
我想出了两个并行的解决方案,以尽可能快地找到 N 皇后问题的一个解决方案。
第一个使用 Futures
import scala.collection.immutable.HashSet
import scala.concurrent.{Await, Future, Promise}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
/**
* Created by mikel on 17/06/16.
*/
object Queens2 extends App {
val time = System.currentTimeMillis()
val boardSize = 200
def firstResult(): Future[List[Int]] = {
def iterate(solution: Vector[(Int, Int)], remainingElements: Set[Int], invalidSum: HashSet[Int], invalidMinus: HashSet[Int]): Stream[List[(Int, Int)]] = {
def isSafe(queens: Vector[(Int, Int)], queen: Int): Boolean = {
!invalidSum.contains(queens.size + queen) && !invalidMinus.contains(queens.size - queen)
}
if (solution.size == boardSize)
Stream(solution.toList)
else {
for {
nextQueen <- remainingElements.toStream if isSafe(solution, nextQueen)
res <- iterate(solution :+(solution.size, nextQueen), remainingElements - nextQueen, invalidSum + (solution.size + nextQueen), invalidMinus + (solution.size - nextQueen))
} yield (res)
}
}
val promise = Promise[List[Int]]()
val allElements = (0 until boardSize).toSet
val range = (0 until boardSize)
range.foreach(pos => {
// HERE we parallelize the execution
Future {
promise.trySuccess(iterate(Vector((0, pos)), allElements - pos, HashSet(pos), HashSet(-pos)).map(_.map(_._2)).head)
}
}
)
promise.future
}
val resFuture = firstResult()
resFuture.onSuccess { case res =>
println("Finished in: " + (System.currentTimeMillis() - time))
println(res)
System.exit(0)
}
Await.result(Promise().future, Duration.Inf)
}
另一个使用 ParRange
import scala.collection.immutable.HashSet
import scala.concurrent.{Await, Future, Promise}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
/**
* Created by mikel on 17/06/16.
*/
object Queens extends App {
val time = System.currentTimeMillis()
val boardSize = 200
def firstResult(): Future[List[Int]] = {
def iterate(solution: Vector[(Int, Int)], remainingElements: Set[Int], invalidSum: HashSet[Int], invalidMinus: HashSet[Int]): Stream[List[(Int, Int)]] = {
def isSafe(queens: Vector[(Int, Int)], queen: Int): Boolean = {
!invalidSum.contains(queens.size + queen) && !invalidMinus.contains(queens.size - queen)
}
if (solution.size == boardSize)
Stream(solution.toList)
else {
for {
nextQueen <- remainingElements.toStream if isSafe(solution, nextQueen)
res <- iterate(solution :+(solution.size, nextQueen), remainingElements - nextQueen, invalidSum + (solution.size + nextQueen), invalidMinus + (solution.size - nextQueen))
} yield (res)
}
}
val promise = Promise[List[Int]]()
Future {
val allElements = (0 until boardSize).toSet
// HERE we parallelize the execution
val range = (0 until boardSize).par
range.foreach(pos => {
promise.trySuccess(iterate(Vector((0, pos)), allElements - pos, HashSet(pos), HashSet(-pos)).map(_.map(_._2)).head)
}
)
}
promise.future
}
val resFuture = firstResult()
resFuture.onSuccess { case res =>
println("Finished in: " + (System.currentTimeMillis() - time))
println(res)
System.exit(0)
}
Await.result(Promise().future, Duration.Inf)
}
在使用 200 尺寸的电路板执行这两个程序后,我使用第一种方法获得了更快的解决方案(显然在一段时间后,第二种解决方案的并行化水平下降了),有人知道为什么会这样吗?
因此,您的第二个代码段的这个固定版本运行速度足够快:
import java.util.concurrent.Executors
import scala.collection.immutable.HashSet
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
/**
* Created by mikel on 17/06/16.
*/
object Queens extends App {
val time = System.currentTimeMillis()
val boardSize = 200
def firstResult(): Future[List[Int]] = {
def iterate(solution: Vector[(Int, Int)], remainingElements: Set[Int], invalidSum: HashSet[Int], invalidMinus: HashSet[Int]): Stream[List[(Int, Int)]] = {
def isSafe(queens: Vector[(Int, Int)], queen: Int): Boolean = {
!invalidSum.contains(queens.size + queen) && !invalidMinus.contains(queens.size - queen)
}
if (solution.size == boardSize)
Stream(solution.toList)
else {
for {
nextQueen <- remainingElements.toStream if isSafe(solution, nextQueen)
res <- iterate(solution :+(solution.size, nextQueen), remainingElements - nextQueen, invalidSum + (solution.size + nextQueen), invalidMinus + (solution.size - nextQueen))
} yield (res)
}
}
val futureExecutor = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(20))
val promise = Promise[List[Int]]()
Future({
val allElements = (0 until boardSize).toSet
// HERE we parallelize the execution
val range = (0 until boardSize).par
range.foreach(pos => {
promise.trySuccess(iterate(Vector((0, pos)), allElements - pos, HashSet(pos), HashSet(-pos)).map(_.map(_._2)).head)
}
)
})(futureExecutor)
promise.future
}
val resFuture = firstResult()
resFuture.onSuccess({ case res =>
println("Finished in: " + (System.currentTimeMillis() - time))
println(res)
System.exit(0)
})(scala.concurrent.ExecutionContext.Implicits.global)
Await.result(Promise().future, Duration.Inf)
}
如您所见,我引入了单独的执行程序来等待结果和计算您的未来。为了使它更明显,我将它们显式化了,但当然你可以使用隐式。
你的第二个代码片段中的问题是线程池在默认执行上下文中耗尽 (scala.concurrent.ExecutionContext.Implicits.global
),因此你的承诺将在几乎所有计算完成之前不会触发。
ParRange
默认使用全局上下文 TaskSupport
:
//...
private[parallel] def getTaskSupport: TaskSupport = new ExecutionContextTaskSupport
//...
class ExecutionContextTaskSupport(val environment: ExecutionContext = scala.concurrent.ExecutionContext.global)
extends TaskSupport with ExecutionContextTasks