如何将 MongoDB Scala 异步驱动程序与 Akka Streams 集成?
How can I integrate MongoDB Scala Async driver with Akka Streams?
我正在将旧的 Casbah Mongo 驱动程序迁移到新的异步 Scala 驱动程序,我正尝试在 Akka 流中使用它,但流卡住了。
我有一个定义了 createLogic() 的 GraphStage。代码如下。这在 Casbah 上运行良好,我希望新 mongo 驱动程序的异步特性非常适合,但这里发生了什么......
如果我通过此代码流式传输 2 条记录,则第一条记录流过并触发下一步。请参阅下面的输出('HERE IN SEND' 确认它已通过)。第二条记录似乎在 BlacklistFilter 中通过了正确的步骤,但 Akka 从未流向 SEND 步骤。
知道为什么这不适用于新驱动程序吗?
object BlacklistFilter {
type FilterShape = FanOutShape2[QM[RenderedExpression], QM[RenderedExpression], QM[Unit]]
}
import BlacklistFilter._
case class BlacklistFilter(facilities: Facilities, helloConfig: HelloConfig)(implicit asys: ActorSystem) extends GraphStage[FilterShape] {
val outPass: Outlet[QM[RenderedExpression]] = Outlet("Pass")
val outFail: Outlet[QM[Unit]] = Outlet("Fail")
val reIn: Inlet[QM[RenderedExpression]] = Inlet("Command")
override val shape: FilterShape = new FanOutShape2(reIn, outPass, outFail)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
override def preStart(): Unit = pull(reIn)
setHandler(reIn, new InHandler {
override def onPush(): Unit = {
val cmd = grab(reIn)
val re: RenderedExpression = cmd.body
val check = re.recipient.contacts(re.media).toString
// NEW NON-BLOCKING CODE
//-------------------------------------
facilities.withMongo(helloConfig.msgDB, helloConfig.blacklistColl) { coll =>
var found: Option[Document] = None
coll.find(Document("_id" -> check)).first().subscribe(
(doc: Document) => {
found = Some(doc)
println("BLACKLIST FAIL! " + check)
emit(outFail, cmd)
// no pull() here as this happens on complete below
},
(e: Throwable) => {
// Log something here!
emit(outFail, cmd)
pull(reIn)
},
() => {
if (found.isEmpty) {
println("BLACKLIST OK. " + check)
emit(outPass, cmd)
}
pull(reIn)
println("Pulled reIn...")
}
)
}
// OLD BLOCKING CASBAH CODE THAT WORKED
//-------------------------------------
// await(facilities.mongoAccess().mongo(helloConfig.msgDB, helloConfig.blacklistColl)(_.findOne(MongoDBObject("_id" -> check)))) match {
// case Some(_) => emit(outFail, cmd)
// case None => emit(outPass, cmd)
// }
// pull(reIn)
}
override def onUpstreamFinish(): Unit = {} // necessary for some reason!
})
setHandler(outPass, eagerTerminateOutput)
setHandler(outFail, eagerTerminateOutput)
}
}
输出:
BLACKLIST OK. jsmith@yahoo.com
Pulled reIn...
HERE IN SEND (TemplateRenderedExpression)!!!
ACK!
BLACKLIST OK. 919-919-9119
Pulled reIn...
您可以从输出中看到第一条记录顺利地流向 SEND/ACK 步骤。第二条记录打印了 BLACKLIST 消息,这意味着它发出了 outPass 然后调用了 pull on reIn ......但是下游没有任何反应。
有谁知道为什么这在 Akka Streams 中的工作方式与在 Casbah 版本中运行良好(显示的代码已注释掉)不同?
(我可以将 Mongo 调用转换为 Future 并等待它,这应该像旧代码一样工作,但这有点破坏了异步的全部意义!)
那么……"never mind"! :-)
上面的代码看起来应该可以工作。然后我注意到 Akka 的家伙刚刚发布了一个新版本 (2.0.1)。我不确定其中有哪些调整,但无论是什么,上面的代码现在都可以按照我希望的方式运行 w/o 修改。
留下这个 post 以防万一有人遇到类似的问题。
我正在将旧的 Casbah Mongo 驱动程序迁移到新的异步 Scala 驱动程序,我正尝试在 Akka 流中使用它,但流卡住了。
我有一个定义了 createLogic() 的 GraphStage。代码如下。这在 Casbah 上运行良好,我希望新 mongo 驱动程序的异步特性非常适合,但这里发生了什么......
如果我通过此代码流式传输 2 条记录,则第一条记录流过并触发下一步。请参阅下面的输出('HERE IN SEND' 确认它已通过)。第二条记录似乎在 BlacklistFilter 中通过了正确的步骤,但 Akka 从未流向 SEND 步骤。
知道为什么这不适用于新驱动程序吗?
object BlacklistFilter {
type FilterShape = FanOutShape2[QM[RenderedExpression], QM[RenderedExpression], QM[Unit]]
}
import BlacklistFilter._
case class BlacklistFilter(facilities: Facilities, helloConfig: HelloConfig)(implicit asys: ActorSystem) extends GraphStage[FilterShape] {
val outPass: Outlet[QM[RenderedExpression]] = Outlet("Pass")
val outFail: Outlet[QM[Unit]] = Outlet("Fail")
val reIn: Inlet[QM[RenderedExpression]] = Inlet("Command")
override val shape: FilterShape = new FanOutShape2(reIn, outPass, outFail)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
override def preStart(): Unit = pull(reIn)
setHandler(reIn, new InHandler {
override def onPush(): Unit = {
val cmd = grab(reIn)
val re: RenderedExpression = cmd.body
val check = re.recipient.contacts(re.media).toString
// NEW NON-BLOCKING CODE
//-------------------------------------
facilities.withMongo(helloConfig.msgDB, helloConfig.blacklistColl) { coll =>
var found: Option[Document] = None
coll.find(Document("_id" -> check)).first().subscribe(
(doc: Document) => {
found = Some(doc)
println("BLACKLIST FAIL! " + check)
emit(outFail, cmd)
// no pull() here as this happens on complete below
},
(e: Throwable) => {
// Log something here!
emit(outFail, cmd)
pull(reIn)
},
() => {
if (found.isEmpty) {
println("BLACKLIST OK. " + check)
emit(outPass, cmd)
}
pull(reIn)
println("Pulled reIn...")
}
)
}
// OLD BLOCKING CASBAH CODE THAT WORKED
//-------------------------------------
// await(facilities.mongoAccess().mongo(helloConfig.msgDB, helloConfig.blacklistColl)(_.findOne(MongoDBObject("_id" -> check)))) match {
// case Some(_) => emit(outFail, cmd)
// case None => emit(outPass, cmd)
// }
// pull(reIn)
}
override def onUpstreamFinish(): Unit = {} // necessary for some reason!
})
setHandler(outPass, eagerTerminateOutput)
setHandler(outFail, eagerTerminateOutput)
}
}
输出:
BLACKLIST OK. jsmith@yahoo.com
Pulled reIn...
HERE IN SEND (TemplateRenderedExpression)!!!
ACK!
BLACKLIST OK. 919-919-9119
Pulled reIn...
您可以从输出中看到第一条记录顺利地流向 SEND/ACK 步骤。第二条记录打印了 BLACKLIST 消息,这意味着它发出了 outPass 然后调用了 pull on reIn ......但是下游没有任何反应。
有谁知道为什么这在 Akka Streams 中的工作方式与在 Casbah 版本中运行良好(显示的代码已注释掉)不同?
(我可以将 Mongo 调用转换为 Future 并等待它,这应该像旧代码一样工作,但这有点破坏了异步的全部意义!)
那么……"never mind"! :-)
上面的代码看起来应该可以工作。然后我注意到 Akka 的家伙刚刚发布了一个新版本 (2.0.1)。我不确定其中有哪些调整,但无论是什么,上面的代码现在都可以按照我希望的方式运行 w/o 修改。
留下这个 post 以防万一有人遇到类似的问题。