在 Scala 中使用多个 Future 作业控制主线程
Control main thread with multiple Future jobs in scala
def fixture =
new {
val xyz = new XYZ(spark)
}
val fList: scala.collection.mutable.MutableList[scala.concurrent.Future[Dataset[Row]]] = scala.collection.mutable.MutableList[scala.concurrent.Future[Dataset[Row]]]() //mutable List of future means List[Future]
test("test case") {
val tasks = for (i <- 1 to 10) {
fList ++ scala.collection.mutable.MutableList[scala.concurrent.Future[Dataset[Row]]](Future {
println("Executing task " + i )
val ds = read(fixture.etlSparkLayer,i)
ds
})
}
Thread.sleep(1000*4200)
val futureOfList = Future.sequence(fList)//list of Future job in Future sequence
println(Await.ready(futureOfList, Duration.Inf))
val await_result: Seq[Dataset[Row]] = Await.result(futureOfList, Duration.Inf)
println("Squares: " + await_result)
futureOfList.onComplete {
case Success(x) => println("Success!!! " + x)
case Failure(ex) => println("Failed !!! " + ex)
}
}
我正在执行一个带有 Future List 序列的测试用例,List 有 Future.I 的集合,试图通过在 scala.In 中使用 Future 的帮助并行地多次执行相同的功能我的系统只有 4 个作业完成 4 个工作后一次开始,接下来的 4 个工作将像这样开始完成所有工作。那么如何一次启动 4 个以上的作业以及主线程将如何等待完成所有 Future 线程?我尝试了 Await.result 和 Await.ready 但无法控制主线程,对于主线程控制我使用 Thread.sleep concept.this 程序用于从 RDBMS table 读取并写入 Elasticsearch。那么如何控制主线程main issue呢?
假设您使用 scala.concurrent.ExecutionContext.Implicits.global
ExecutionContext
您可以按照此处所述调整线程数:
特别是以下系统属性:scala.concurrent.context.minThreads
、scala.concurrent.context.numThreads
。 scala.concurrent.context.maxThreads
,以及 scala.concurrent.context.maxExtraThreads
否则,您可以将代码重写为如下内容:
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent._
import java.util.concurrent.Executors
test("test case") {
implicit val ec = ExecutionContext.fromExecutorService(ExecutorService.newFixedThreadPool(NUMBEROFTHREADSYOUWANT))
val aFuture = Future.traverse(1 to 10) {
i => Future {
println("Executing task " + i)
read(fixture.etlSparkLayer,i) // If this is a blocking operation you may want to consider wrapping it in a `blocking {}`-block.
}
}
aFuture.onComplete(_ => ec.shutdownNow()) // Only for this test, and to make sure the pool gets cleaned up
val await_result: immutable.Seq[Dataset[Row]] = Await.result(aFuture, 60.minutes) // Or other timeout
println("Squares: " + await_result)
}
def fixture =
new {
val xyz = new XYZ(spark)
}
val fList: scala.collection.mutable.MutableList[scala.concurrent.Future[Dataset[Row]]] = scala.collection.mutable.MutableList[scala.concurrent.Future[Dataset[Row]]]() //mutable List of future means List[Future]
test("test case") {
val tasks = for (i <- 1 to 10) {
fList ++ scala.collection.mutable.MutableList[scala.concurrent.Future[Dataset[Row]]](Future {
println("Executing task " + i )
val ds = read(fixture.etlSparkLayer,i)
ds
})
}
Thread.sleep(1000*4200)
val futureOfList = Future.sequence(fList)//list of Future job in Future sequence
println(Await.ready(futureOfList, Duration.Inf))
val await_result: Seq[Dataset[Row]] = Await.result(futureOfList, Duration.Inf)
println("Squares: " + await_result)
futureOfList.onComplete {
case Success(x) => println("Success!!! " + x)
case Failure(ex) => println("Failed !!! " + ex)
}
}
我正在执行一个带有 Future List 序列的测试用例,List 有 Future.I 的集合,试图通过在 scala.In 中使用 Future 的帮助并行地多次执行相同的功能我的系统只有 4 个作业完成 4 个工作后一次开始,接下来的 4 个工作将像这样开始完成所有工作。那么如何一次启动 4 个以上的作业以及主线程将如何等待完成所有 Future 线程?我尝试了 Await.result 和 Await.ready 但无法控制主线程,对于主线程控制我使用 Thread.sleep concept.this 程序用于从 RDBMS table 读取并写入 Elasticsearch。那么如何控制主线程main issue呢?
假设您使用 scala.concurrent.ExecutionContext.Implicits.global
ExecutionContext
您可以按照此处所述调整线程数:
特别是以下系统属性:scala.concurrent.context.minThreads
、scala.concurrent.context.numThreads
。 scala.concurrent.context.maxThreads
,以及 scala.concurrent.context.maxExtraThreads
否则,您可以将代码重写为如下内容:
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent._
import java.util.concurrent.Executors
test("test case") {
implicit val ec = ExecutionContext.fromExecutorService(ExecutorService.newFixedThreadPool(NUMBEROFTHREADSYOUWANT))
val aFuture = Future.traverse(1 to 10) {
i => Future {
println("Executing task " + i)
read(fixture.etlSparkLayer,i) // If this is a blocking operation you may want to consider wrapping it in a `blocking {}`-block.
}
}
aFuture.onComplete(_ => ec.shutdownNow()) // Only for this test, and to make sure the pool gets cleaned up
val await_result: immutable.Seq[Dataset[Row]] = Await.result(aFuture, 60.minutes) // Or other timeout
println("Squares: " + await_result)
}