Play + ReactiveMongo:capped collection 和 tailable cursor
Play + ReactiveMongo: capped collection and tailable cursor
我将 Play Framework 与 Scala、Akka 和 ReactiveMongo 一起使用。我想使用 MongoDB 中的集合作为循环队列。多个参与者可以向其中插入文档;一旦这些文档可用(一种发布-订阅系统),一个参与者就会检索这些文档。
我正在使用 capped collections 和 tailable cursor。每次我检索一些文档时,我都必须 运行 命令 EmptyCapped 刷新上限集合(不可能从中删除元素),否则我总是检索相同的文档。有替代解决方案吗?例如,有没有办法在不删除元素的情况下滑动光标?或者在我的情况下最好不要使用上限集合?
object MexDB {
def db: reactivemongo.api.DB = ReactiveMongoPlugin.db
val size: Int = 10000
// creating capped collection
val collection: JSONCollection = {
val c = db.collection[JSONCollection]("messages")
val isCapped = coll.convertToCapped(size, None)
Await.ready(isCapped, Duration.Inf)
c
}
def insert(mex: Mex) = {
val inserted = collection.insert(mex)
inserted onComplete {
case Failure(e) =>
Logger.info("Error while inserting task: " + e.getMessage())
throw e
case Success(i) =>
Logger.info("Successfully inserted task")
}
}
def find(): Enumerator[Mex] = {
val cursor: Cursor[Mex] = collection
.find(Json.obj())
.options(QueryOpts().tailable.awaitData)
.cursor[Mex]
// meaning of maxDocs ???
val maxDocs = 1
cursor.enumerate(maxDocs)
}
def removeAll() = {
db.command(new EmptyCapped("messages"))
}
}
/*** part of receiver actor code ***/
// inside preStart
val it = Iteratee.fold[Mex, List[Mex]](Nil) {
(partialList, mex) => partialList ::: List(mex)
}
// Inside "receive" method
case Data =>
val e: Enumerator[Mex] = MexDB.find()
val future = e.run(it)
future onComplete {
case Success(list) =>
list foreach { mex =>
Logger.info("Mex: " + mex.id)
}
MexDB.removeAll()
self ! Data
case Failure(e) => Logger.info("Error: "+ e.getMessage())
}
您的 tailable 游标在每个找到文档后关闭,如 maxDocs = 1
。要使其无限期打开,您应该忽略此限制。
对于 awaitData
,.onComplete
只有在您明确关闭 RM 时才会被调用。
您需要使用游标中的一些流函数,例如 .enumerate
并处理每个新的 step/result。参见 https://github.com/sgodbillon/reactivemongo-tailablecursor-demo/
我将 Play Framework 与 Scala、Akka 和 ReactiveMongo 一起使用。我想使用 MongoDB 中的集合作为循环队列。多个参与者可以向其中插入文档;一旦这些文档可用(一种发布-订阅系统),一个参与者就会检索这些文档。 我正在使用 capped collections 和 tailable cursor。每次我检索一些文档时,我都必须 运行 命令 EmptyCapped 刷新上限集合(不可能从中删除元素),否则我总是检索相同的文档。有替代解决方案吗?例如,有没有办法在不删除元素的情况下滑动光标?或者在我的情况下最好不要使用上限集合?
object MexDB {
def db: reactivemongo.api.DB = ReactiveMongoPlugin.db
val size: Int = 10000
// creating capped collection
val collection: JSONCollection = {
val c = db.collection[JSONCollection]("messages")
val isCapped = coll.convertToCapped(size, None)
Await.ready(isCapped, Duration.Inf)
c
}
def insert(mex: Mex) = {
val inserted = collection.insert(mex)
inserted onComplete {
case Failure(e) =>
Logger.info("Error while inserting task: " + e.getMessage())
throw e
case Success(i) =>
Logger.info("Successfully inserted task")
}
}
def find(): Enumerator[Mex] = {
val cursor: Cursor[Mex] = collection
.find(Json.obj())
.options(QueryOpts().tailable.awaitData)
.cursor[Mex]
// meaning of maxDocs ???
val maxDocs = 1
cursor.enumerate(maxDocs)
}
def removeAll() = {
db.command(new EmptyCapped("messages"))
}
}
/*** part of receiver actor code ***/
// inside preStart
val it = Iteratee.fold[Mex, List[Mex]](Nil) {
(partialList, mex) => partialList ::: List(mex)
}
// Inside "receive" method
case Data =>
val e: Enumerator[Mex] = MexDB.find()
val future = e.run(it)
future onComplete {
case Success(list) =>
list foreach { mex =>
Logger.info("Mex: " + mex.id)
}
MexDB.removeAll()
self ! Data
case Failure(e) => Logger.info("Error: "+ e.getMessage())
}
您的 tailable 游标在每个找到文档后关闭,如 maxDocs = 1
。要使其无限期打开,您应该忽略此限制。
对于 awaitData
,.onComplete
只有在您明确关闭 RM 时才会被调用。
您需要使用游标中的一些流函数,例如 .enumerate
并处理每个新的 step/result。参见 https://github.com/sgodbillon/reactivemongo-tailablecursor-demo/