卡在scala中的期货

stuck with futures in scala

基本上我是 运行 两个关于 cassandra 的期货查询,然后我需要做一些计算和 return 值(值的平均值)。

这是我的代码:

object TestWrapFuture {
  def main(args: Array[String]) {
    val category = 5392
    ExtensiveComputation.average(category).onComplete {
      case Success(s) => println(s)
      case Failure(f) => throw new Exception(f)
    }
  }
}

class ExtensiveComputation {

  val volume = new ListBuffer[Int]()

  def average(categoryId: Int): Future[Double] = {

    val productsByCategory = Product.findProductsByCategory(categoryId)

    productsByCategory.map { prods =>
      for (prod <- prods if prod._2) {
        Sku.findSkusByProductId(prod._1).map { skus =>
          skus.foreach(sku => volume += (sku.height.get * sku.width.get * sku.length.get))
        }
      }

      val average = volume.sum / volume.length
      average
    }
  }
}

object ExtensiveComputation extends ExtensiveComputation

那么问题是什么?

skus.foreach 正在将结果值附加到 ListBuffer 中。由于一切都是异步的,当我尝试在我的 main 中获取结果时,我收到一条错误消息,说我不能除以零。

事实上,由于我的 Sku.findSkusByProduct return 是一个未来,当我尝试计算平均值时,交易量是空的。

我应该在此计算之前阻止任何事情,还是应该做其他事情?

编辑

嗯,我试过这样屏蔽:

  val volume = new ListBuffer[Int]()

  def average(categoryId: Int): Future[Double] = {

    val productsByCategory = Product.findProductsByCategory(categoryId)

    val blocked = productsByCategory.map { prods =>
      for (prod <- prods if prod._2) {
        Sku.findSkusByProductId(prod._1).map { skus =>
          skus.foreach(sku => volume += (sku.height.get * sku.width.get * sku.length.get))
        }
      }
    }

    Await.result(blocked, Duration.Inf)
    val average = volume.sum / volume.length
    Future.successful(average)
  }

然后我从这段代码中得到了两个不同的结果:

    Sku.findSkusByProductId(prod._1).map { skus =>
          skus.foreach(sku => volume += (sku.height.get * sku.width.get * sku.length.get))
        }

1 - 当在 cassandra 上只有 50 个要查找时,它会运行并给我结果

2 - 当有很多像 1000 时,它给了我

java.lang.ArithmeticException: / by zero

编辑 2

我按照@Olivier Michallat 的建议尝试了这段代码

   def average(categoryId: Int): Future[Double] = {

    val productsByCategory = Product.findProductsByCategory(categoryId)

    productsByCategory.map { prods =>
      for (prod <- prods if prod._2) findBlocking(prod._1)
      volume.sum / volume.length
    }
  }

  def findBlocking(productId: Long) = {
    val future = Sku.findSkusByProductId(productId).map { skus =>
      skus.foreach(sku => volume += (sku.height.get * sku.width.get * sku.length.get))
    }

    Await.result(future, Duration.Inf)
  }

以及@kolmar 提议的以下内容:

   def average(categoryId: Int): Future[Int] = {
    for {
      prods <- Product.findProductsByCategory(categoryId)
      filtered = prods.filter(_._2)
      skus <- Future.traverse(filtered)(p => Sku.findSkusByProductId(p._1))
    } yield {
      val volumes = skus.flatten.map(sku => sku.height.get * sku.width.get * sku.length.get)
      volumes.sum / volumes.size
    }
  }

两者都使用一些 skus 来查找 50,但都无法使用许多 skus 来查找 1000 throw ArithmeticException: / by zero

它似乎无法在 return 计算未来之前计算所有内容...

您需要等到 findSkusByProductId 生成的所有期货都完成后再计算平均值。因此,将所有这些未来累积到 Seq 中,对其调用 Future.sequence 以获得 Future[Seq],然后将该未来映射到计算平均值的函数。然后将 productsByCategory.map 替换为 flatMap.

因为你必须调用一个函数 returns 一个 Future 参数序列,最好使用 Future.traverse

例如:

object ExtensiveComputation {
  def average(categoryId: Int): Future[Double] = {
    for {
      products <- Product.findProductsByCategory(categoryId)
      filtered = products.filter(_._2)
      skus <- Future.traverse(filtered)(p => Sku.findSkusByProductId(p._1))
    } yield {
      val volumes = skus.map { sku => 
        sku.height.get * sku.width.get * sku.length.get }
      volumes.sum / volumes.size
    }
  }
}