播放 mongo 枚举器意外停止
Play mongo Enumerator stops unexpectedly
设置 Scala 2.11.4、Playframework 2.3.7、Reacivemongo(0.10.5.0.akka23/0.11.0-SNAPSHOT 都尝试过)。
我们有一个包含 18'000 个实体的集合,使用 Enumerator/Iteratee 方法以异步方式处理这个集合。
案例一。
处理很简单(将实体提取为 CSV 格式并将它们作为 REST 响应以块的形式发送)一切正常,所有记录都被提取和处理。
案例二。
处理涉及最多需要 10 秒的计算,以及计算后更新记录,计算是使用 foreach Iteratee 完成的,它会更新内部任务跟踪器中已处理实体的数量。处理过程可能需要一段时间,但没关系
Patient.findByClient(clientName) &>
Enumeratee.mapM(patient => {
val evaluatedAndSaveTask = patient.
evaluate(parser).
flatMap(patientOpt =>
patientOpt.
map(evaluatedPatient => evaluatedPatient.saveAndGet().map(Some(_))).
getOrElse(Future.successful(None))
)
evaluatedAndSaveTask.recover({
case t =>
t.printStackTrace()
None
})
})
// Step 2.1. Running evaluation process through Iteratee
val evaluationTask = evaluation run Iteratee.foreach(patientOpt => {
collection.update(Json.obj("clientName" -> clientName), Json.obj("$inc" -> Json.obj("processedPatients" -> 1))))
)
// Step 2.3. Log errors
evaluationTask.onSuccess({ case _ => Patient.LOG.info("PatientEvaluation DONE") })
evaluationTask.onFailure({ case t => {
t.printStackTrace();
Patient.LOG.info("PatientEvaluation FAILED");
}})
在这种情况下,只有 575 个实体得到处理,Iteratee 结束打印输出 "Patient evaluation DONE"。
我从等式中删除了保存,但没有帮助。
为什么会这样?
我终于找到了问题的罪魁祸首 - Mongo 超时后自动过期评估,您可以指定 noCursorTimeout 标志,以防止这种情况发生:
collection.
find(findQ).
sort(if(sortQF.values.isEmpty) sortQ else sortQF).
options(QueryOpts(skipN = offset + page._1 * page._2).noCursorTimeout).
cursor[T].
出于某种原因,ReactiveMongo 在这种情况下不会抛出 Exception,只是关闭 Iterator。我在 ReactiveMongo https://github.com/ReactiveMongo/ReactiveMongo/issues/250 中创建了一个 Issue,在此之后。
目前,对我来说,使游标过期并以偏移量重新启动可能更安全。
设置 Scala 2.11.4、Playframework 2.3.7、Reacivemongo(0.10.5.0.akka23/0.11.0-SNAPSHOT 都尝试过)。
我们有一个包含 18'000 个实体的集合,使用 Enumerator/Iteratee 方法以异步方式处理这个集合。
案例一。 处理很简单(将实体提取为 CSV 格式并将它们作为 REST 响应以块的形式发送)一切正常,所有记录都被提取和处理。
案例二。 处理涉及最多需要 10 秒的计算,以及计算后更新记录,计算是使用 foreach Iteratee 完成的,它会更新内部任务跟踪器中已处理实体的数量。处理过程可能需要一段时间,但没关系
Patient.findByClient(clientName) &>
Enumeratee.mapM(patient => {
val evaluatedAndSaveTask = patient.
evaluate(parser).
flatMap(patientOpt =>
patientOpt.
map(evaluatedPatient => evaluatedPatient.saveAndGet().map(Some(_))).
getOrElse(Future.successful(None))
)
evaluatedAndSaveTask.recover({
case t =>
t.printStackTrace()
None
})
})
// Step 2.1. Running evaluation process through Iteratee
val evaluationTask = evaluation run Iteratee.foreach(patientOpt => {
collection.update(Json.obj("clientName" -> clientName), Json.obj("$inc" -> Json.obj("processedPatients" -> 1))))
)
// Step 2.3. Log errors
evaluationTask.onSuccess({ case _ => Patient.LOG.info("PatientEvaluation DONE") })
evaluationTask.onFailure({ case t => {
t.printStackTrace();
Patient.LOG.info("PatientEvaluation FAILED");
}})
在这种情况下,只有 575 个实体得到处理,Iteratee 结束打印输出 "Patient evaluation DONE"。
我从等式中删除了保存,但没有帮助。
为什么会这样?
我终于找到了问题的罪魁祸首 - Mongo 超时后自动过期评估,您可以指定 noCursorTimeout 标志,以防止这种情况发生:
collection.
find(findQ).
sort(if(sortQF.values.isEmpty) sortQ else sortQF).
options(QueryOpts(skipN = offset + page._1 * page._2).noCursorTimeout).
cursor[T].
出于某种原因,ReactiveMongo 在这种情况下不会抛出 Exception,只是关闭 Iterator。我在 ReactiveMongo https://github.com/ReactiveMongo/ReactiveMongo/issues/250 中创建了一个 Issue,在此之后。
目前,对我来说,使游标过期并以偏移量重新启动可能更安全。