Future.sequence 的片状 onSuccess

Flaky onSuccess of Future.sequence

我写了这个方法:

import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.util.{ Success, Failure }

object FuturesSequence extends App {
  val f1 = future {
    1
  }

  val f2 = future {
    2
  }

  val lf = List(f1, f2)

  val seq = Future.sequence(lf)

  seq.onSuccess {
    case l => println(l)
  }
}

我期待 Future.sequence 将 List[Future] 收集到 Future[List] 中,然后等待每个 future(在我的例子中是 f1 和 f2)完成,然后再调用 Future[List 上的 onSuccess ] seq 在我的例子中。

但是在多次运行这段代码之后,它偶尔只打印一次 "List(1, 2)",我不明白为什么它没有按预期工作。

试试这个,

import scala.concurrent._
import java.util.concurrent.Executors
import scala.util.{ Success, Failure }

object FuturesSequence extends App {
  implicit val exec = ExecutionContext.fromExecutor(Executors.newCachedThreadPool)
  val f1 = future {
    1
  }

  val f2 = future {
    2
  }

  val lf = List(f1, f2)

  val seq = Future.sequence(lf)

  seq.onSuccess {
    case l => println(l)
  }
}

这将始终打印 List(1,2)。原因很简单,上面的 exec 是线程(不是守护线程)的 ExecutionContext,在您的示例中,ExecutionContext 是从 ExecutionContext.Implicits.global 中隐式获取的默认线程,其中包含守护线程。

因此作为守护进程,进程不会等待 seq 未来完成并终止。如果 seq 确实完成了,那么它就会打印出来。但这并不总是发生

应用程序在未来完成之前退出。

You need to block until the future has completed。这可以通过多种方式实现,包括更改 ExecutionContext、实例化新的线程池、Thread.sleep 等,或使用 scala.concurrent.Await

上的方法

最简单的代码方法是使用 Await.ready。这会在 future 上阻塞一段指定的时间。在下面修改后的代码中,应用程序在退出前等待 5 秒。

另请注意,额外的导入 scala.concurrent.duration 因此我们可以指定等待时间。

import scala.concurrent._
import scala.concurrent.duration._
import java.util.concurrent.Executors
import scala.util.{ Success, Failure }

object FuturesSequence extends App {
  val f1 = future {
    1
  }

  val f2 = future {
    2
  }

  val lf = List(f1, f2)

  val seq = Future.sequence(lf)

  seq.onSuccess {
    case l => println(l)
  }

  Await.ready(seq, 5 seconds)
}

通过使用 Await.result,您也可以跳过 onSuccess 方法,因为它会 return 结果列表给您。

示例:

val seq: List[Int] = Await.result(Future.sequence(lf), 5 seconds)
println(seq)