在 Scala 中使用多个 Future 作业控制主线程

Control main thread with multiple Future jobs in scala

def fixture =
    new {

      val xyz = new XYZ(spark)
    }    
val fList: scala.collection.mutable.MutableList[scala.concurrent.Future[Dataset[Row]]] = scala.collection.mutable.MutableList[scala.concurrent.Future[Dataset[Row]]]() //mutable List of future means List[Future]

    test("test case") {     
        val tasks = for (i <- 1 to 10) {
          fList ++ scala.collection.mutable.MutableList[scala.concurrent.Future[Dataset[Row]]](Future {
            println("Executing task " + i )
            val ds = read(fixture.etlSparkLayer,i)           
            ds
          })
        }

        Thread.sleep(1000*4200)
        val futureOfList = Future.sequence(fList)//list of Future job in Future sequence       
        println(Await.ready(futureOfList, Duration.Inf))


        val await_result: Seq[Dataset[Row]] = Await.result(futureOfList, Duration.Inf)
        println("Squares: " + await_result)

        futureOfList.onComplete {
          case Success(x) => println("Success!!! " + x)
          case Failure(ex) => println("Failed !!! " + ex)
        }               
      }

我正在执行一个带有 Future List 序列的测试用例,List 有 Future.I 的集合,试图通过在 scala.In 中使用 Future 的帮助并行地多次执行相同的功能我的系统只有 4 个作业完成 4 个工作后一次开始,接下来的 4 个工作将像这样开始完成所有工作。那么如何一次启动 4 个以上的作业以及主线程将如何等待完成所有 Future 线程?我尝试了 Await.resultAwait.ready 但无法控制主线程,对于主线程控制我使用 Thread.sleep concept.this 程序用于从 RDBMS table 读取并写入 Elasticsearch。那么如何控制主线程main issue呢?

假设您使用 scala.concurrent.ExecutionContext.Implicits.global ExecutionContext 您可以按照此处所述调整线程数:

https://github.com/scala/scala/blob/2.12.x/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L100

特别是以下系统属性:scala.concurrent.context.minThreadsscala.concurrent.context.numThreadsscala.concurrent.context.maxThreads,以及 scala.concurrent.context.maxExtraThreads

否则,您可以将代码重写为如下内容:

import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent._
import java.util.concurrent.Executors

test("test case") { 
  implicit val ec = ExecutionContext.fromExecutorService(ExecutorService.newFixedThreadPool(NUMBEROFTHREADSYOUWANT))
  val aFuture = Future.traverse(1 to 10) {
    i => Future {
      println("Executing task " + i)
      read(fixture.etlSparkLayer,i) // If this is a blocking operation you may want to consider wrapping it in a `blocking {}`-block.          
    }
  }
  aFuture.onComplete(_ => ec.shutdownNow()) // Only for this test, and to make sure the pool gets cleaned up
  val await_result: immutable.Seq[Dataset[Row]] = Await.result(aFuture, 60.minutes) // Or other timeout
  println("Squares: " + await_result) 
}