等待将来完成以执行另一个序列

Wait future completion to execute a another one for a sequence

最近几天我遇到了读/写问题,我无法在测试中解决问题。我有一个基于以下模型

的 JSON 文档
package models 

import play.api.libs.json._

object Models {

  case class Record
  (
    id : Int, 
    samples : List[Double]
  )

  object Record {
    implicit val recordFormat = Json.format[Record]
  }
}

我有两个功能:一个是读取记录,另一个是更新。

case class MongoIO(futureCollection : Future[JSONCollection]) {

  def readRecord(id : Int) : Future[Option[Record]] = 
    futureCollection
      .flatMap { collection =>
      collection.find(Json.obj("id" -> id)).one[Record]
    }

  def updateRecord(id : Int, newSample : Double) : Future[UpdateWriteResult]= {
    readRecord(id) flatMap { recordOpt =>
      recordOpt match {
        case None =>
          Future { UpdateWriteResult(ok = false, -1, -1, Seq(), Seq(), None, None, None) }
        case Some(record) =>
          val newRecord = 
            record.copy(samples = record.samples :+ newSample)
          futureCollection
          .flatMap { collection =>
            collection.update(Json.obj("id" -> id), newRecord)
          }
      }
    }
  }
}

现在,我有一个 List[Future[UpdateWriteResult]] 对应于文档上的许多更新,但我想要的是:等待未来完成执行第二个然后等待第二个完成执行第三。我试着用这样的 foldLeft and flatMap 来做到这一点:

val l : List[Future[UpdateWriteResult]] = ...
println(l.size) // give me 10
l
.foldLeft(Future.successful(UpdateWriteResult(ok = false, -1, -1, Seq(), Seq(), None, None, None))) { 
     case (cur, next) => cur.flatMap(_ => next)
  }

但是文档从来没有像例外那样更新过:我得到了一个包含 1 个样本的列表,而不是有一个样本列表大小为 10 的文档。所以读比写快(我的印象)并且使用 foldLeft / flatMap 的组合似乎不等待当前未来的完成所以我怎样才能正确解决这个问题(没有等待)?

更新

val futureCollection = DB.getCollection("foo")
val mongoIO = MongoIO(futureCollection)
val id = 1
val samples = List(1.1, 2.2, 3.3)
val l : List[Future[UpdateWriteResult]] = samples.map(sample => mongoIO.updateRecord(id, sample))

您必须在 samples 上执行 foldLeft:

(mongoIO.updateRecord(id, samples.head) /: samples.tail) {(acc, next) =>
  acc.flatMap(_ => mongoIO.updateRecord(id, next))
}

updateRecord 是触发未来的东西,所以你必须确保 在前一个完成之前不要调用它

我冒昧地对你的原始资料做了一些小的修改:

case class MongoIO(futureCollection : Future[JSONCollection]) {

  def readRecord(id : Int) : Future[Option[Record]] = 
    futureCollection.flatMap(_.find(Json.obj("id" -> id)).one[Record])

  def updateRecord(id : Int, newSample : Double) : Future[UpdateWriteResult] =
    readRecord(id) flatMap {
      case None => Future.successful(UpdateWriteResult(ok = false, -1, -1, Nil, Nil, None, None, None))
      case Some(record) =>
        val newRecord = record.copy(samples = record.samples :+ newSample)
        futureCollection.flatMap(_.update(Json.obj("id" -> id), newRecord))
    }
}

那我们只需要写一个serialized/sequential更新即可:

def update(samples: Seq[Double], id: Int): Future[Unit] = s match {
  case sample +: remainingSamples => updateRecord(id, sample).flatMap(_ => update(remainingSamples, id))
  case _ => Future.successful(())
}