我可以将 Action.async 与多个 Futures 一起使用吗?

Can I use Action.async with multiple Futures?

中,我得到了有关将 Scala Futures 与 PlayFramework 结合使用的建议,谢谢。现在事情变得有点复杂了。比方说,在我只需要绘制可以找到水果的位置之前:

def getMapData(coll: MongoCollection[Document], s: String): Future[Seq[Document]] = ...

def mapFruit(collection: MongoCollection[Document]) = Action.async {
  val fut = getMapData(collection, "fruit")
  fut.map { docs: Seq[Document] =>
    Ok(docs.toJson)
  } recover {
    case e => Console.err.println("FAIL: " + e.getMessage); BadRequest("FAIL")
  }
}

事实证明,人们更关心苹果而不是香蕉或樱桃,因此如果地图上显示的项目不超过 100 个,人们希望苹果优先于香蕉和樱桃,但不超过某个百分比地图上的项目应该是苹果。某些函数 pickDocs 决定了正确的组合。我认为这样的事情可能会奏效,但不行:

def mapApplesBananasCherries(collection: MongoCollection[Document]) = Action.async {
  val futA = getMapData(collection, "apples")
  val futB = getMapData(collection, "bananas")
  val futC = getMapData(collection, "cherries")
  futA.map { docsA: Seq[Document] =>
    futB.map { docsB: Seq[Document] =>
      futC.map { docsC: Seq[Document] =>
        val docsPicked = pickDocs(100, docsA, docsB, docsC)
        Ok(docsPicked.toJson)
      }
    }
    // won't compile without something here, e.g. Ok("whatever")
  } recover {
    case e => Console.err.println("FAIL: " + e.getMessage); BadRequest("FAIL")
  }
}

当我只有一个 Future 时,生活很简单,但现在我有三个。我该怎么做才能使 (1) 工作和 (2) 再次变得简单?在所有三个 Futures 都具有值之前,我无法真正构建 Web 响应。

基本上,你应该使用 flatMap

futA.flatMap { docsA: Seq[String] =>
  futB.flatMap { docsB: Seq[String] =>
    futC.map { docsC: Seq[String] =>
      docsPicked = pickDocs(100, docsA, docsB, docsC)
        Ok(docsPicked.toJson)
      }
    }
}

此外,您可以使用理解:

val res = for {
  docsA <- futA
  docsB <- futB
  docsC <- futC
} yield Ok(pickDocs(100, docsA, docsB, docsC).toJson)
res.recover {
  case e => Console.err.println("FAIL: " + e.getMessage); BadRequest("FAIL")
}

这是期货和类似 类 的非常常见的模式 "contain values"(例如 OptionList

要使用 flatMap 方法合并结果,结果代码为

def mapApplesBananasCherries(collection: MongoCollection[Document]) = Action.async {
  val futA = getMapData(collection, "apples")
  val futB = getMapData(collection, "bananas")
  val futC = getMapData(collection, "cherries")
  futA.flatMap { docsA =>
    futB.flatMap { docsB =>
      futC.map { docsC =>
        val docsPicked = pickDocs(100, docsA, docsB, docsC)
        Ok(docsPicked.toJson)
      }
    }
  } recover {
    case e => Console.err.println("FAIL: " + e.getMessage); BadRequest("FAIL")
  }
}

事实上,它是如此常见,以至于存在一种特殊的语法以使其更具可读性,称为 for-comprehension:以下代码等同于前面的代码片段

def mapApplesBananasCherries(collection: MongoCollection[Document]) = Action.async {
  val futA = getMapData(collection, "apples")
  val futB = getMapData(collection, "bananas")
  val futC = getMapData(collection, "cherries")
  for {
    apples <- futA
    bananas <- futB
    cherries <- futC
  } yield {
    val docsPicked = pickDocs(100, apples, bananas, cherries)
    Ok(docsPicked.toJson)
  } recover {
    case e => Console.err.println("FAIL: " + e.getMessage); BadRequest("FAIL")
  }
}

如果我的理解是你想以那个优先级执行苹果、樱桃和香蕉,我会编写类似这样的代码

import scala.concurrent.{Await, Future}
import scala.util.Random
import scala.concurrent.duration._

object WaitingFutures extends App {

  implicit val ec = scala.concurrent.ExecutionContext.Implicits.global

  val apples = Future {50 + Random.nextInt(100)}
  val cherries = Future {50 + Random.nextInt(100)}
  val bananas =  Future {50 + Random.nextInt(100)}

  val mix = for {
    app <- apples
    cher <- if (app < 100) cherries else Future {0}
    ban <- if (app + cher < 100) bananas else Future {0}
  } yield (app,cher,ban)


  mix.onComplete {m =>
    println(s"mix ${m.get}")
  }

  Await.result(mix, 3 seconds)

}

如果未来完成时苹果 returns 超过 100,它不会等到樱桃或香蕉完成,而是 returns 0 的虚拟未来。如果不够,它将等到樱桃被执行等等。

注意我没有在如何发出 if 信号上投入太多精力,所以我使用的是虚拟未来,这可能不是最好的方法。

这不会编译,因为您嵌套的 future 块返回 Future[Future[Future[Response]]]。如果您改为在期货上使用 flatMap,您的期货将不会嵌套。

如果您希望减少重复,可以使用 Future.sequence 来同时启动期货。您可以使用模式匹配来重新提取列表:

val futureCollections = List("apples", "bananas", "cherries").map{ getMapData(collection, _) }

Future.sequence(futureCollections) map { case docsA :: docsB :: docsC :: Nil =>
  Ok(pickDocs(100, docsA, docsB, docsC).toJson)
} recover {
  case e => Console.err.println("FAIL: " + e.getMessage); BadRequest("FAIL")
}

或者您可以将 pickDocs 函数交给一个列表列表(按优先级排序)供其选择。

Future.sequence(futureCollections) map { docLists =>
  Ok(pickDocs(docLists, 100, 0.75f).toJson)
} recover {
  case e => Console.err.println("FAIL: " + e.getMessage); BadRequest("FAIL")
}

pickDocs 实现将占用列表头部的一定百分比,除非完整列表中没有足够的文档,它需要更多,然后递归地对剩余的应用相同的百分比插槽列表。


def pickDocs[T](lists: List[List[T]], max: Int, dampPercentage: Float): List[T] = {
  lists match {
    case Nil => Nil
    case head :: tail =>
      val remainingLength = tail.flatten.length
      val x = max - remainingLength
      val y = math.ceil(max * dampPercentage).toInt
      val fromHere = head.take(x max y)
      fromHere ++ pickDocs(tail, max - fromHere.length, dampPercentage)
  }
}