Scala:加入/等待越来越多的期货队列
Scala: joining / waiting for growing queue of futures
我启动了几个异步进程,如果需要,这些进程又可以启动更多进程(想想遍历目录结构或类似的东西)。每个过程 returns 一些东西,最后我想等待所有这些过程的完成并安排一个函数来对结果集合做一些事情。
天真的尝试
我的解决方案尝试使用了一个可变的 ListBuffer
(我一直在其中添加我产生的期货),并且 Future.sequence
在所有这些期货完成时将一些功能安排到 运行列在此缓冲区中。
我准备了一个最简单的例子来说明这个问题:
object FuturesTest extends App {
var queue = ListBuffer[Future[Int]]()
val f1 = Future {
Thread.sleep(1000)
val f3 = Future {
Thread.sleep(2000)
Console.println(s"f3: 1+2=3 sec; queue = $queue")
3
}
queue += f3
Console.println(s"f1: 1 sec; queue = $queue")
1
}
val f2 = Future {
Thread.sleep(2000)
Console.println(s"f2: 2 sec; queue = $queue")
2
}
queue += f1
queue += f2
Console.println(s"starting; queue = $queue")
Future.sequence(queue).foreach(
(all) => Console.println(s"Future.sequence finished with $all")
)
Thread.sleep(5000) // simulates app being alive later
}
它首先安排 f1
和 f2
期货,然后 f3
将在 1 秒后的 f1
分辨率中安排。 f3
本身将在 2 秒后解决。因此,我希望得到的是:
starting; queue = ListBuffer(Future(<not completed>), Future(<not completed>))
f1: 1 sec; queue = ListBuffer(Future(<not completed>), Future(<not completed>), Future(<not completed>))
f2: 2 sec; queue = ListBuffer(Future(Success(1)), Future(<not completed>), Future(<not completed>))
f3: 1+2=3 sec; queue = ListBuffer(Future(Success(1)), Future(Success(2)), Future(<not completed>))
Future.sequence finished with ListBuffer(1, 2, 3)
然而,我实际上得到:
starting; queue = ListBuffer(Future(<not completed>), Future(<not completed>))
f1: 1 sec; queue = ListBuffer(Future(<not completed>), Future(<not completed>), Future(<not completed>))
f2: 2 sec; queue = ListBuffer(Future(Success(1)), Future(<not completed>), Future(<not completed>))
Future.sequence finished with ListBuffer(1, 2)
f3: 1+2=3 sec; queue = ListBuffer(Future(Success(1)), Future(Success(2)), Future(<not completed>))
...这很可能是因为我们等待的期货列表在 Future.sequence
的初始调用期间是固定的,以后不会改变。
工作,但丑陋的尝试
最终,我用这段代码让它按照我想要的方式运行:
waitForSequence(queue, (all: ListBuffer[Int]) => Console.println(s"finished with $all"))
def waitForSequence[T](queue: ListBuffer[Future[T]], act: (ListBuffer[T] => Unit)): Unit = {
val seq = Future.sequence(queue)
seq.onComplete {
case Success(res) =>
if (res.size < queue.size) {
Console.println("... still waiting for tasks")
waitForSequence(queue, act)
} else {
act(res)
}
case Failure(exc) =>
throw exc
}
}
这按预期工作,最终获得所有 3 个期货:
starting; queue = ListBuffer(Future(<not completed>), Future(<not completed>))
f1: 1 sec; queue = ListBuffer(Future(<not completed>), Future(<not completed>), Future(<not completed>))
f2: 2 sec; queue = ListBuffer(Future(Success(1)), Future(<not completed>), Future(<not completed>))
... still waiting for tasks
f3: 1+2=3 sec; queue = ListBuffer(Future(Success(1)), Future(Success(2)), Future(<not completed>))
finished with ListBuffer(1, 2, 3)
但是还是很丑。如果在完成时发现队列比结果数长,它就会重新开始 Future.sequence
等待,希望下次完成时情况会好一些。当然,这很糟糕,因为它耗尽了堆栈,如果此检查将在创建未来和将其附加到队列之间的微小 window 中触发,则可能容易出错。
是否可以在不使用 Akka 重写所有内容或使用 Await.result
的情况下这样做(I can't actually use 因为我的代码是为 Scala.js 编译的)。
执行此操作的正确方法可能是组合您的 Futures。具体来说,f1 不应该只是启动 f3,它可能应该 flatMap 覆盖它——也就是说,f1 的 Future 在 f3 解析之前不会解析。
请记住,Future.sequence
是一种后备选项,仅当 Futures 都真正断开连接时才使用。在您所描述的情况下,存在真正的依赖关系,这些在您实际返回的 Futures 中得到了最好的体现。使用 Futures 时,flatMap 是你的朋友,应该是你最先使用的工具之一。 (通常但不总是 for
理解。)
可以肯定地说,如果您想要一个可变的 Futures 队列,代码的结构不正确,并且有更好的方法来实现。特别是在 Scala.js(这是我的大部分代码所在的地方,而且非常依赖 Future),我经常 constantly 用于理解这些 Futures——我认为这是只有理智的操作方式...
就像贾斯汀提到的,你不能失去对其他期货内部产生的期货的引用,你应该使用 map 和 flatMap 来链接它们。
val f1 = Future {
Thread.sleep(1000)
val f3 = Future {
Thread.sleep(2000)
Console.println(s"f3: 1+2=3 sec")
3
}
f3.map{
r =>
Console.println(s"f1: 1 sec;")
Seq(1, r)
}
}.flatMap(identity)
val f2 = Future {
Thread.sleep(2000)
Console.println(s"f2: 2 sec;")
Seq(2)
}
val futures = Seq(f1, f2)
Future.sequence(futures).foreach(
(all) => Console.println(s"Future.sequence finished with ${all.flatten}")
)
Thread.sleep(5000) // simulates app being alive later
这适用于最小示例,我不确定它是否适用于您的实际用例。结果是:
f2: 2 sec;
f3: 1+2=3 sec
f1: 1 sec;
Future.sequence finished with List(1, 3, 2)
我不会涉及 Future.sequence
:它并行化操作,您似乎正在寻找顺序异步执行。此外,您可能不需要期货在定义后立即开始。组合应该看起来像这样:
def run[T](queue: List[() => Future[T]]): Future[List[T]] = {
(Future.successful(List.empty[T]) /: queue)(case (f1, f2) =>
f1() flatMap (h => )
)
val t0 = now
def f(n: Int): () => Future[String] = () => {
println(s"starting $n")
Future[String] {
Thread.sleep(100*n)
s"<<$n/${now - t0}>>"
}
}
println(Await.result(run(f(7)::f(10)::f(20)::f(3)::Nil), 20 seconds))
诀窍是不要过早地推出期货;这就是为什么我们有 f(n)
在我们用 ()
.
调用它之前不会启动的原因
我启动了几个异步进程,如果需要,这些进程又可以启动更多进程(想想遍历目录结构或类似的东西)。每个过程 returns 一些东西,最后我想等待所有这些过程的完成并安排一个函数来对结果集合做一些事情。
天真的尝试
我的解决方案尝试使用了一个可变的 ListBuffer
(我一直在其中添加我产生的期货),并且 Future.sequence
在所有这些期货完成时将一些功能安排到 运行列在此缓冲区中。
我准备了一个最简单的例子来说明这个问题:
object FuturesTest extends App {
var queue = ListBuffer[Future[Int]]()
val f1 = Future {
Thread.sleep(1000)
val f3 = Future {
Thread.sleep(2000)
Console.println(s"f3: 1+2=3 sec; queue = $queue")
3
}
queue += f3
Console.println(s"f1: 1 sec; queue = $queue")
1
}
val f2 = Future {
Thread.sleep(2000)
Console.println(s"f2: 2 sec; queue = $queue")
2
}
queue += f1
queue += f2
Console.println(s"starting; queue = $queue")
Future.sequence(queue).foreach(
(all) => Console.println(s"Future.sequence finished with $all")
)
Thread.sleep(5000) // simulates app being alive later
}
它首先安排 f1
和 f2
期货,然后 f3
将在 1 秒后的 f1
分辨率中安排。 f3
本身将在 2 秒后解决。因此,我希望得到的是:
starting; queue = ListBuffer(Future(<not completed>), Future(<not completed>))
f1: 1 sec; queue = ListBuffer(Future(<not completed>), Future(<not completed>), Future(<not completed>))
f2: 2 sec; queue = ListBuffer(Future(Success(1)), Future(<not completed>), Future(<not completed>))
f3: 1+2=3 sec; queue = ListBuffer(Future(Success(1)), Future(Success(2)), Future(<not completed>))
Future.sequence finished with ListBuffer(1, 2, 3)
然而,我实际上得到:
starting; queue = ListBuffer(Future(<not completed>), Future(<not completed>))
f1: 1 sec; queue = ListBuffer(Future(<not completed>), Future(<not completed>), Future(<not completed>))
f2: 2 sec; queue = ListBuffer(Future(Success(1)), Future(<not completed>), Future(<not completed>))
Future.sequence finished with ListBuffer(1, 2)
f3: 1+2=3 sec; queue = ListBuffer(Future(Success(1)), Future(Success(2)), Future(<not completed>))
...这很可能是因为我们等待的期货列表在 Future.sequence
的初始调用期间是固定的,以后不会改变。
工作,但丑陋的尝试
最终,我用这段代码让它按照我想要的方式运行:
waitForSequence(queue, (all: ListBuffer[Int]) => Console.println(s"finished with $all"))
def waitForSequence[T](queue: ListBuffer[Future[T]], act: (ListBuffer[T] => Unit)): Unit = {
val seq = Future.sequence(queue)
seq.onComplete {
case Success(res) =>
if (res.size < queue.size) {
Console.println("... still waiting for tasks")
waitForSequence(queue, act)
} else {
act(res)
}
case Failure(exc) =>
throw exc
}
}
这按预期工作,最终获得所有 3 个期货:
starting; queue = ListBuffer(Future(<not completed>), Future(<not completed>))
f1: 1 sec; queue = ListBuffer(Future(<not completed>), Future(<not completed>), Future(<not completed>))
f2: 2 sec; queue = ListBuffer(Future(Success(1)), Future(<not completed>), Future(<not completed>))
... still waiting for tasks
f3: 1+2=3 sec; queue = ListBuffer(Future(Success(1)), Future(Success(2)), Future(<not completed>))
finished with ListBuffer(1, 2, 3)
但是还是很丑。如果在完成时发现队列比结果数长,它就会重新开始 Future.sequence
等待,希望下次完成时情况会好一些。当然,这很糟糕,因为它耗尽了堆栈,如果此检查将在创建未来和将其附加到队列之间的微小 window 中触发,则可能容易出错。
是否可以在不使用 Akka 重写所有内容或使用 Await.result
的情况下这样做(I can't actually use 因为我的代码是为 Scala.js 编译的)。
执行此操作的正确方法可能是组合您的 Futures。具体来说,f1 不应该只是启动 f3,它可能应该 flatMap 覆盖它——也就是说,f1 的 Future 在 f3 解析之前不会解析。
请记住,Future.sequence
是一种后备选项,仅当 Futures 都真正断开连接时才使用。在您所描述的情况下,存在真正的依赖关系,这些在您实际返回的 Futures 中得到了最好的体现。使用 Futures 时,flatMap 是你的朋友,应该是你最先使用的工具之一。 (通常但不总是 for
理解。)
可以肯定地说,如果您想要一个可变的 Futures 队列,代码的结构不正确,并且有更好的方法来实现。特别是在 Scala.js(这是我的大部分代码所在的地方,而且非常依赖 Future),我经常 constantly 用于理解这些 Futures——我认为这是只有理智的操作方式...
就像贾斯汀提到的,你不能失去对其他期货内部产生的期货的引用,你应该使用 map 和 flatMap 来链接它们。
val f1 = Future {
Thread.sleep(1000)
val f3 = Future {
Thread.sleep(2000)
Console.println(s"f3: 1+2=3 sec")
3
}
f3.map{
r =>
Console.println(s"f1: 1 sec;")
Seq(1, r)
}
}.flatMap(identity)
val f2 = Future {
Thread.sleep(2000)
Console.println(s"f2: 2 sec;")
Seq(2)
}
val futures = Seq(f1, f2)
Future.sequence(futures).foreach(
(all) => Console.println(s"Future.sequence finished with ${all.flatten}")
)
Thread.sleep(5000) // simulates app being alive later
这适用于最小示例,我不确定它是否适用于您的实际用例。结果是:
f2: 2 sec;
f3: 1+2=3 sec
f1: 1 sec;
Future.sequence finished with List(1, 3, 2)
我不会涉及 Future.sequence
:它并行化操作,您似乎正在寻找顺序异步执行。此外,您可能不需要期货在定义后立即开始。组合应该看起来像这样:
def run[T](queue: List[() => Future[T]]): Future[List[T]] = {
(Future.successful(List.empty[T]) /: queue)(case (f1, f2) =>
f1() flatMap (h => )
)
val t0 = now
def f(n: Int): () => Future[String] = () => {
println(s"starting $n")
Future[String] {
Thread.sleep(100*n)
s"<<$n/${now - t0}>>"
}
}
println(Await.result(run(f(7)::f(10)::f(20)::f(3)::Nil), 20 seconds))
诀窍是不要过早地推出期货;这就是为什么我们有 f(n)
在我们用 ()
.