scala futures 顺序惰性执行
scala futures sequential lazy execution
我有 iterable
个期货,每个 return 都是一个序列:Iterable[Future[Seq[Int]]]
因此,我需要一个序列 [=21] =]来自期货:Seq[Int]
。
问题是我只需要结果序列的前 n
个元素,所以我并不总是需要执行所有的期货。我也事先不知道需要执行多少个期货才能实现(也许第一个 return 就足够了,也许我们必须全部执行)。
显然,我需要按顺序执行我的功能。我可以做 foreach 和 break/return,但我想用函数式风格来表达它。
以下似乎有效并且 'looks' 功能正常。我对这将如何实际执行或在幕后如何行动知之甚少。
这可以很容易地插入到 def 中,其中硬编码的 4 被传递给一个参数。
我想我应该 return 5 个元素,即使只要求 4 个元素,因为对中期未来的评估需要以任何一种方式进行。删除多余的元素应该很简单。
val f = Iterable(Future(Seq(1,2,3)), Future(Seq(4,5)), Future(Seq(6,7,8)))
val output = f.foldLeft(Future(Seq.empty[Int])){(previous, next) =>
previous.flatMap{pSeq =>
if(pSeq.length >= 4) {
Future(pSeq)
} else {
next.map(nSeq => pSeq ++ nSeq)
}
}
}
println(Await.result(output, Duration.Inf)) //List(1,2,3,4,5)
我不喜欢的一点是将 pSeq 包装在 Future 中只是为了保持一致的类型。
编辑:只是对维克托回答的回应(我无法发表评论,因为没有足够高的代表,它稍微增加了我的回答的价值)。
尽管 Viktor 的回答更容易阅读,但它必须等待所有 Future 完成,即使它们不是必需的。
例如,将我的 f 替换为以下内容:
val f = Iterable(Future(Seq(1,2,3)), Future(Seq(4,5)), Future(throw new Exception))
它仍然有效,Viktor 的调用 Future.sequence 将 Iterable[Future[]] 转换为 Future[Iterable[]],因此所有这些都必须是完成。
要么使用 Future.fold:
scala> import scala.concurrent._
scala> import ExecutionContext.Implicits.global
scala> Future.fold(Iterable(Future(Seq(1,2,3)), Future(Seq(4,5)), Future(Seq(6,7,8))))(Seq.empty[Int])( (prev, cur) => if(prev.size >= 4) prev else prev ++ cur) foreach println
List(1, 2, 3, 4, 5)
scala>
或者您查看 Future.fold 的实现方式并添加退出条件。 (本质上是一个 foldUntil)
我有 iterable
个期货,每个 return 都是一个序列:Iterable[Future[Seq[Int]]]
因此,我需要一个序列 [=21] =]来自期货:Seq[Int]
。
问题是我只需要结果序列的前 n
个元素,所以我并不总是需要执行所有的期货。我也事先不知道需要执行多少个期货才能实现(也许第一个 return 就足够了,也许我们必须全部执行)。
显然,我需要按顺序执行我的功能。我可以做 foreach 和 break/return,但我想用函数式风格来表达它。
以下似乎有效并且 'looks' 功能正常。我对这将如何实际执行或在幕后如何行动知之甚少。
这可以很容易地插入到 def 中,其中硬编码的 4 被传递给一个参数。
我想我应该 return 5 个元素,即使只要求 4 个元素,因为对中期未来的评估需要以任何一种方式进行。删除多余的元素应该很简单。
val f = Iterable(Future(Seq(1,2,3)), Future(Seq(4,5)), Future(Seq(6,7,8)))
val output = f.foldLeft(Future(Seq.empty[Int])){(previous, next) =>
previous.flatMap{pSeq =>
if(pSeq.length >= 4) {
Future(pSeq)
} else {
next.map(nSeq => pSeq ++ nSeq)
}
}
}
println(Await.result(output, Duration.Inf)) //List(1,2,3,4,5)
我不喜欢的一点是将 pSeq 包装在 Future 中只是为了保持一致的类型。
编辑:只是对维克托回答的回应(我无法发表评论,因为没有足够高的代表,它稍微增加了我的回答的价值)。
尽管 Viktor 的回答更容易阅读,但它必须等待所有 Future 完成,即使它们不是必需的。
例如,将我的 f 替换为以下内容:
val f = Iterable(Future(Seq(1,2,3)), Future(Seq(4,5)), Future(throw new Exception))
它仍然有效,Viktor 的调用 Future.sequence 将 Iterable[Future[]] 转换为 Future[Iterable[]],因此所有这些都必须是完成。
要么使用 Future.fold:
scala> import scala.concurrent._
scala> import ExecutionContext.Implicits.global
scala> Future.fold(Iterable(Future(Seq(1,2,3)), Future(Seq(4,5)), Future(Seq(6,7,8))))(Seq.empty[Int])( (prev, cur) => if(prev.size >= 4) prev else prev ++ cur) foreach println
List(1, 2, 3, 4, 5)
scala>
或者您查看 Future.fold 的实现方式并添加退出条件。 (本质上是一个 foldUntil)