试图顺序执行期货。怎么了?
Trying to make sequential executing of futures. What is wrong?
我正在尝试按顺序执行returns未来的功能
所以,我有一个 collection
val in = Seq(1, 1, -1, -2, 3, -4, 5, 6, 7, -1, -2, -9, 1, 2, 2)
并函数处理此 collection
中的每个整数
def intToFuture(int: Int): Future[Int] = {
Future {
println(s"Running function $int")
Thread.sleep(1500)
int * 100
}
}
我需要通过部分并行处理来实现处理 collection 的逻辑。
获取前 n 个元素,将每个元素并行乘以 100,然后获取下 n 个元素并执行相同的操作...等
我所做的(在我阅读了本网站上的一些帖子之后)是,我实现了两个功能
1)处理一批计算
def processBatch(ints: Seq[Int])(f: Int => Future[Int]): Future[Seq[Int]] = {
Future.sequence(ints.map(f))
}
2) 和第二个,它适用于迭代处理
def batchTraverse(in: Seq[Int], size: Int)(f: Int => Future[Int]): Future[Seq[Int]] = {
val grs = in.grouped(size).toList
def loop(l: Seq[Seq[Int]]): Future[Seq[Int]] = {
l match {
case Nil =>
Future.successful(l.flatten)//? flatten
case head :: tail =>
println("head="+head)
processBatch(head)(f).flatMap{
s => loop(tail).map{ t =>
s.appendedAll(t)
}
}
}
}
loop(grs)
}
从
开始
val fs: Future[Seq[Int]] = batchTraverse(in, 3)(intToFuture)
fs.onComplete{
f => println(f)
}
结果只做了一次迭代,我哪里搞错了?
您的功能实际上似乎运行良好,可能发生的情况是您的程序在 future 有机会完成之前终止,因此您只看到第一次迭代。通过在您的代码中添加 await ,我能够使事情正常进行。
import scala.concurrent._
import scala.concurrent.duration._
val fs: Future[Seq[Int]] = batchTraverse(in, 3)(intToFuture)
fs.onComplete{
f => println(f)
}
Await.result(fs, Duration.Inf)
您可能希望持续时间小于 Duration.Inf
,因为这将永远等待未来完成。这样做我能够得到以下输出:
head=List(1, 1, -1)
Running function 1
Running function 1
Running function -1
head=List(-2, 3, -4)
Running function -4
Running function 3
Running function -2
head=List(5, 6, 7)
Running function 7
Running function 6
Running function 5
head=List(-1, -2, -9)
Running function -9
Running function -2
Running function -1
head=List(1, 2, 2)
Running function 2
Running function 2
Running function 1
Success(List(100, 100, -100, -200, 300, -400, 500, 600, 700, -100, -200, -900, 100, 200, 200))
我正在尝试按顺序执行returns未来的功能
所以,我有一个 collection
val in = Seq(1, 1, -1, -2, 3, -4, 5, 6, 7, -1, -2, -9, 1, 2, 2)
并函数处理此 collection
中的每个整数 def intToFuture(int: Int): Future[Int] = {
Future {
println(s"Running function $int")
Thread.sleep(1500)
int * 100
}
}
我需要通过部分并行处理来实现处理 collection 的逻辑。 获取前 n 个元素,将每个元素并行乘以 100,然后获取下 n 个元素并执行相同的操作...等
我所做的(在我阅读了本网站上的一些帖子之后)是,我实现了两个功能
1)处理一批计算
def processBatch(ints: Seq[Int])(f: Int => Future[Int]): Future[Seq[Int]] = {
Future.sequence(ints.map(f))
}
2) 和第二个,它适用于迭代处理
def batchTraverse(in: Seq[Int], size: Int)(f: Int => Future[Int]): Future[Seq[Int]] = {
val grs = in.grouped(size).toList
def loop(l: Seq[Seq[Int]]): Future[Seq[Int]] = {
l match {
case Nil =>
Future.successful(l.flatten)//? flatten
case head :: tail =>
println("head="+head)
processBatch(head)(f).flatMap{
s => loop(tail).map{ t =>
s.appendedAll(t)
}
}
}
}
loop(grs)
}
从
开始 val fs: Future[Seq[Int]] = batchTraverse(in, 3)(intToFuture)
fs.onComplete{
f => println(f)
}
结果只做了一次迭代,我哪里搞错了?
您的功能实际上似乎运行良好,可能发生的情况是您的程序在 future 有机会完成之前终止,因此您只看到第一次迭代。通过在您的代码中添加 await ,我能够使事情正常进行。
import scala.concurrent._
import scala.concurrent.duration._
val fs: Future[Seq[Int]] = batchTraverse(in, 3)(intToFuture)
fs.onComplete{
f => println(f)
}
Await.result(fs, Duration.Inf)
您可能希望持续时间小于 Duration.Inf
,因为这将永远等待未来完成。这样做我能够得到以下输出:
head=List(1, 1, -1)
Running function 1
Running function 1
Running function -1
head=List(-2, 3, -4)
Running function -4
Running function 3
Running function -2
head=List(5, 6, 7)
Running function 7
Running function 6
Running function 5
head=List(-1, -2, -9)
Running function -9
Running function -2
Running function -1
head=List(1, 2, 2)
Running function 2
Running function 2
Running function 1
Success(List(100, 100, -100, -200, 300, -400, 500, 600, 700, -100, -200, -900, 100, 200, 200))