scala futures 顺序惰性执行

scala futures sequential lazy execution

我有 iterable 个期货,每个 return 都是一个序列:Iterable[Future[Seq[Int]]] 因此,我需要一个序列 [=21] =]来自期货:Seq[Int]

问题是我只需要结果序列的前 n 个元素,所以我并不总是需要执行所有的期货。我也事先不知道需要执行多少个期货才能实现(也许第一个 return 就足够了,也许我们必须全部执行)。

显然,我需要按顺序执行我的功能。我可以做 foreach 和 break/return,但我想用函数式风格来表达它。

以下似乎有效并且 'looks' 功能正常。我对这将如何实际执行或在幕后如何行动知之甚少。

这可以很容易地插入到 def 中,其中硬编码的 4 被传递给一个参数。

我想我应该 return 5 个元素,即使只要求 4 个元素,因为对中期未来的评估需要以任何一种方式进行。删除多余的元素应该很简单。

val f = Iterable(Future(Seq(1,2,3)), Future(Seq(4,5)), Future(Seq(6,7,8)))

val output = f.foldLeft(Future(Seq.empty[Int])){(previous, next) =>
    previous.flatMap{pSeq =>
        if(pSeq.length >= 4) {
            Future(pSeq)
        } else {
            next.map(nSeq => pSeq ++ nSeq)
        }
    }
}

println(Await.result(output, Duration.Inf)) //List(1,2,3,4,5)

我不喜欢的一点是将 pSeq 包装在 Future 中只是为了保持一致的类型。

编辑:只是对维克托回答的回应(我无法发表评论,因为没有足够高的代表,它稍微增加了我的回答的价值)。

尽管 Viktor 的回答更容易阅读,但它必须等待所有 Future 完成,即使它们不是必需的。

例如,将我的 f 替换为以下内容:

val f = Iterable(Future(Seq(1,2,3)), Future(Seq(4,5)), Future(throw new Exception))

它仍然有效,Viktor 的调用 Future.sequence 将 Iterable[Future[]] 转换为 Future[Iterable[]],因此所有这些都必须是完成。

要么使用 Future.fold:

scala> import scala.concurrent._

scala> import ExecutionContext.Implicits.global

scala> Future.fold(Iterable(Future(Seq(1,2,3)), Future(Seq(4,5)), Future(Seq(6,7,8))))(Seq.empty[Int])( (prev, cur) => if(prev.size >= 4) prev else prev ++ cur) foreach println

List(1, 2, 3, 4, 5)

scala>

或者您查看 Future.fold 的实现方式并添加退出条件。 (本质上是一个 foldUntil)