卡在scala中的期货
stuck with futures in scala
基本上我是 运行 两个关于 cassandra 的期货查询,然后我需要做一些计算和 return 值(值的平均值)。
这是我的代码:
object TestWrapFuture {
def main(args: Array[String]) {
val category = 5392
ExtensiveComputation.average(category).onComplete {
case Success(s) => println(s)
case Failure(f) => throw new Exception(f)
}
}
}
class ExtensiveComputation {
val volume = new ListBuffer[Int]()
def average(categoryId: Int): Future[Double] = {
val productsByCategory = Product.findProductsByCategory(categoryId)
productsByCategory.map { prods =>
for (prod <- prods if prod._2) {
Sku.findSkusByProductId(prod._1).map { skus =>
skus.foreach(sku => volume += (sku.height.get * sku.width.get * sku.length.get))
}
}
val average = volume.sum / volume.length
average
}
}
}
object ExtensiveComputation extends ExtensiveComputation
那么问题是什么?
skus.foreach 正在将结果值附加到 ListBuffer 中。由于一切都是异步的,当我尝试在我的 main 中获取结果时,我收到一条错误消息,说我不能除以零。
事实上,由于我的 Sku.findSkusByProduct return 是一个未来,当我尝试计算平均值时,交易量是空的。
我应该在此计算之前阻止任何事情,还是应该做其他事情?
编辑
嗯,我试过这样屏蔽:
val volume = new ListBuffer[Int]()
def average(categoryId: Int): Future[Double] = {
val productsByCategory = Product.findProductsByCategory(categoryId)
val blocked = productsByCategory.map { prods =>
for (prod <- prods if prod._2) {
Sku.findSkusByProductId(prod._1).map { skus =>
skus.foreach(sku => volume += (sku.height.get * sku.width.get * sku.length.get))
}
}
}
Await.result(blocked, Duration.Inf)
val average = volume.sum / volume.length
Future.successful(average)
}
然后我从这段代码中得到了两个不同的结果:
Sku.findSkusByProductId(prod._1).map { skus =>
skus.foreach(sku => volume += (sku.height.get * sku.width.get * sku.length.get))
}
1 - 当在 cassandra 上只有 50 个要查找时,它会运行并给我结果
2 - 当有很多像 1000 时,它给了我
java.lang.ArithmeticException: / by zero
编辑 2
我按照@Olivier Michallat 的建议尝试了这段代码
def average(categoryId: Int): Future[Double] = {
val productsByCategory = Product.findProductsByCategory(categoryId)
productsByCategory.map { prods =>
for (prod <- prods if prod._2) findBlocking(prod._1)
volume.sum / volume.length
}
}
def findBlocking(productId: Long) = {
val future = Sku.findSkusByProductId(productId).map { skus =>
skus.foreach(sku => volume += (sku.height.get * sku.width.get * sku.length.get))
}
Await.result(future, Duration.Inf)
}
以及@kolmar 提议的以下内容:
def average(categoryId: Int): Future[Int] = {
for {
prods <- Product.findProductsByCategory(categoryId)
filtered = prods.filter(_._2)
skus <- Future.traverse(filtered)(p => Sku.findSkusByProductId(p._1))
} yield {
val volumes = skus.flatten.map(sku => sku.height.get * sku.width.get * sku.length.get)
volumes.sum / volumes.size
}
}
两者都使用一些 skus 来查找 50,但都无法使用许多 skus 来查找 1000 throw ArithmeticException: / by zero
它似乎无法在 return 计算未来之前计算所有内容...
您需要等到 findSkusByProductId
生成的所有期货都完成后再计算平均值。因此,将所有这些未来累积到 Seq
中,对其调用 Future.sequence
以获得 Future[Seq]
,然后将该未来映射到计算平均值的函数。然后将 productsByCategory.map
替换为 flatMap
.
因为你必须调用一个函数 returns 一个 Future
参数序列,最好使用 Future.traverse
。
例如:
object ExtensiveComputation {
def average(categoryId: Int): Future[Double] = {
for {
products <- Product.findProductsByCategory(categoryId)
filtered = products.filter(_._2)
skus <- Future.traverse(filtered)(p => Sku.findSkusByProductId(p._1))
} yield {
val volumes = skus.map { sku =>
sku.height.get * sku.width.get * sku.length.get }
volumes.sum / volumes.size
}
}
}
基本上我是 运行 两个关于 cassandra 的期货查询,然后我需要做一些计算和 return 值(值的平均值)。
这是我的代码:
object TestWrapFuture {
def main(args: Array[String]) {
val category = 5392
ExtensiveComputation.average(category).onComplete {
case Success(s) => println(s)
case Failure(f) => throw new Exception(f)
}
}
}
class ExtensiveComputation {
val volume = new ListBuffer[Int]()
def average(categoryId: Int): Future[Double] = {
val productsByCategory = Product.findProductsByCategory(categoryId)
productsByCategory.map { prods =>
for (prod <- prods if prod._2) {
Sku.findSkusByProductId(prod._1).map { skus =>
skus.foreach(sku => volume += (sku.height.get * sku.width.get * sku.length.get))
}
}
val average = volume.sum / volume.length
average
}
}
}
object ExtensiveComputation extends ExtensiveComputation
那么问题是什么?
skus.foreach 正在将结果值附加到 ListBuffer 中。由于一切都是异步的,当我尝试在我的 main 中获取结果时,我收到一条错误消息,说我不能除以零。
事实上,由于我的 Sku.findSkusByProduct return 是一个未来,当我尝试计算平均值时,交易量是空的。
我应该在此计算之前阻止任何事情,还是应该做其他事情?
编辑
嗯,我试过这样屏蔽:
val volume = new ListBuffer[Int]()
def average(categoryId: Int): Future[Double] = {
val productsByCategory = Product.findProductsByCategory(categoryId)
val blocked = productsByCategory.map { prods =>
for (prod <- prods if prod._2) {
Sku.findSkusByProductId(prod._1).map { skus =>
skus.foreach(sku => volume += (sku.height.get * sku.width.get * sku.length.get))
}
}
}
Await.result(blocked, Duration.Inf)
val average = volume.sum / volume.length
Future.successful(average)
}
然后我从这段代码中得到了两个不同的结果:
Sku.findSkusByProductId(prod._1).map { skus =>
skus.foreach(sku => volume += (sku.height.get * sku.width.get * sku.length.get))
}
1 - 当在 cassandra 上只有 50 个要查找时,它会运行并给我结果
2 - 当有很多像 1000 时,它给了我
java.lang.ArithmeticException: / by zero
编辑 2
我按照@Olivier Michallat 的建议尝试了这段代码
def average(categoryId: Int): Future[Double] = {
val productsByCategory = Product.findProductsByCategory(categoryId)
productsByCategory.map { prods =>
for (prod <- prods if prod._2) findBlocking(prod._1)
volume.sum / volume.length
}
}
def findBlocking(productId: Long) = {
val future = Sku.findSkusByProductId(productId).map { skus =>
skus.foreach(sku => volume += (sku.height.get * sku.width.get * sku.length.get))
}
Await.result(future, Duration.Inf)
}
以及@kolmar 提议的以下内容:
def average(categoryId: Int): Future[Int] = {
for {
prods <- Product.findProductsByCategory(categoryId)
filtered = prods.filter(_._2)
skus <- Future.traverse(filtered)(p => Sku.findSkusByProductId(p._1))
} yield {
val volumes = skus.flatten.map(sku => sku.height.get * sku.width.get * sku.length.get)
volumes.sum / volumes.size
}
}
两者都使用一些 skus 来查找 50,但都无法使用许多 skus 来查找 1000 throw ArithmeticException: / by zero
它似乎无法在 return 计算未来之前计算所有内容...
您需要等到 findSkusByProductId
生成的所有期货都完成后再计算平均值。因此,将所有这些未来累积到 Seq
中,对其调用 Future.sequence
以获得 Future[Seq]
,然后将该未来映射到计算平均值的函数。然后将 productsByCategory.map
替换为 flatMap
.
因为你必须调用一个函数 returns 一个 Future
参数序列,最好使用 Future.traverse
。
例如:
object ExtensiveComputation {
def average(categoryId: Int): Future[Double] = {
for {
products <- Product.findProductsByCategory(categoryId)
filtered = products.filter(_._2)
skus <- Future.traverse(filtered)(p => Sku.findSkusByProductId(p._1))
} yield {
val volumes = skus.map { sku =>
sku.height.get * sku.width.get * sku.length.get }
volumes.sum / volumes.size
}
}
}