在scala中执行函数中的所有Futures之前,如何停止函数返回?

How to stop the function from returning until all the Futures in the function are executed in scala?

我正在尝试异步处理一些文件,能够选择程序应使用的线程数。但我想等到 processFiles() 处理完所有文件。所以,我正在寻找方法来阻止函数返回,直到所有 Futures 完成执行。如果有人提出解决此问题的任何想法,那将非常有帮助。这是我的示例代码。

object FileReader{

    def processFiles(files: Array[File]) = {
        val execService = Executors.newFixedThreadPool(5)
        implicit val execContext = ExecutionContext.fromExecutorService(execService)

        val processed = files.map { f =>
            Future {
                val name = f.getAbsolutePath()
                val fp = Source.fromFile(name)
                var data = ""
                fp.getLines().foreach(x => {
                    data = data ++ s"$x\n"
                })
                fp.close()
                // process the data.
                println("Processing ....")
                data
            }
        }
        execContext.shutdown()

    }

        def main(args: Array[String]): Unit = {
        println("Start")


        val tmp = new File("/path/to/files")
        val files = tmp.listFiles()
        val result = processFiles(files)
        println("done processing")
        println("done work")

    }

}

我在想如果我这里的Future用法有误,请指正。

我的预期输出:

Start
Processing ....
Processing ....
Processing ....
Processing ....
Processing ....
Processing ....
Processing ....
Processing ....
Processing ....
Processing ....
done processing
done work

我当前的输出:

Start
done processing
done work
Processing ....
Processing ....
Processing ....
Processing ....
Processing ....
Processing ....
Processing ....
Processing ....
Processing ....
Processing ....

您需要使用 Future.traverse 将所有 Future 组合起来用于单个文件处理,然后 Await.result 在它们之后:

import java.io.File
import java.util.concurrent.Executors

import scala.io.Source
import scala.concurrent._
import scala.concurrent.duration._
import scala.language.postfixOps

object FileReader {

  def processFiles(files: Array[File]) = {
    val execService = Executors.newFixedThreadPool(5)
    implicit val execContext = ExecutionContext.fromExecutorService(execService)

    //Turn `Array[Future[String]]` to `Future[Array[String]]`
    val processed = Future.traverse(files.toList) { f =>
      Future {
        val name = f.getAbsolutePath()
        val fp = Source.fromFile(name)
        var data = ""
        fp.getLines()
          .foreach(x => {
            data = data ++ s"$x\n"
          })
        fp.close()
        // process the data.
        println("Processing ....")
        data
      }
    }
    //TODO: Put proper timeout here
    //Execution will be blocked until all futures completed
    Await.result(processed, 30 minute)
    execContext.shutdown()

  }

  def main(args: Array[String]): Unit = {
    println("Start")

    val tmp = new File(
      "/path/to/file"
    )
    val files = tmp.listFiles()
    val result = processFiles(files)
    println("done processing")
    println("done work")

  }
}

感谢@Ivan Kurchenko。该解决方案奏效了。我正在发布有效代码的最终版本。

object FileReader {

    def processFiles(files: Seq[File]) = {
        val execService = Executors.newFixedThreadPool(5)
        implicit val execContext = ExecutionContext.fromExecutorService(execService)
        //Turn `Array[Future[String]]` to `Future[Array[String]]`
        val processed = Future.traverse(files) {
            f =>
                Future {
                    val name = f.getAbsolutePath()
                    val fp = Source.fromFile(name)
                    var data = ""
                    fp.getLines()
                        .foreach(x => {
                            data = augmentString(data) ++ s"$x\n"
                        })
                    fp.close()
                    // process the data.
                    println("Processing ....")
                    f
                }
        }
//      TODO: Put proper timeout here
//      Execution will be blocked until all futures completed
        Await.result(processed, 30.minute)
        execContext.shutdown()

    }

    def main(args: Array[String]): Unit = {
        println("Start")

        val tmp = new File(
            "/path/to/file"
        )
        val files =tmp.listFiles.toSeq
        val result = processFiles(files)


        println("done processing")
        println("done work")

    }
}