如何将 MongoDB Scala 异步驱动程序与 Akka Streams 集成?

How can I integrate MongoDB Scala Async driver with Akka Streams?

我正在将旧的 Casbah Mongo 驱动程序迁移到新的异步 Scala 驱动程序,我正尝试在 Akka 流中使用它,但流卡住了。

我有一个定义了 createLogic() 的 GraphStage。代码如下。这在 Casbah 上运行良好,我希望新 mongo 驱动程序的异步特性非常适合,但这里发生了什么......

如果我通过此代码流式传输 2 条记录,则第一条记录流过并触发下一步。请参阅下面的输出('HERE IN SEND' 确认它已通过)。第二条记录似乎在 BlacklistFilter 中通过了正确的步骤,但 Akka 从未流向 SEND 步骤。

知道为什么这不适用于新驱动程序吗?

object BlacklistFilter {
  type FilterShape = FanOutShape2[QM[RenderedExpression], QM[RenderedExpression], QM[Unit]]
}
import BlacklistFilter._
case class BlacklistFilter(facilities: Facilities, helloConfig: HelloConfig)(implicit asys: ActorSystem) extends GraphStage[FilterShape] {
  val outPass: Outlet[QM[RenderedExpression]] = Outlet("Pass")
  val outFail: Outlet[QM[Unit]] = Outlet("Fail")
  val reIn: Inlet[QM[RenderedExpression]] = Inlet("Command")

  override val shape: FilterShape = new FanOutShape2(reIn, outPass, outFail)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
    override def preStart(): Unit = pull(reIn)

    setHandler(reIn, new InHandler {
      override def onPush(): Unit = {
        val cmd = grab(reIn)
        val re: RenderedExpression = cmd.body
        val check = re.recipient.contacts(re.media).toString

        // NEW NON-BLOCKING CODE 
        //-------------------------------------
        facilities.withMongo(helloConfig.msgDB, helloConfig.blacklistColl) { coll =>
          var found: Option[Document] = None
          coll.find(Document("_id" -> check)).first().subscribe(
            (doc: Document) => {
              found = Some(doc)
              println("BLACKLIST FAIL! " + check)
              emit(outFail, cmd)
              // no pull() here as this happens on complete below
            },
            (e: Throwable) => {
              // Log something here!
              emit(outFail, cmd)
              pull(reIn)
            },
            () => {
              if (found.isEmpty) {
                println("BLACKLIST OK. " + check)
                emit(outPass, cmd)
              }
              pull(reIn)
              println("Pulled reIn...")
            }
          )
        }

        // OLD BLOCKING CASBAH CODE THAT WORKED
        //-------------------------------------
        // await(facilities.mongoAccess().mongo(helloConfig.msgDB, helloConfig.blacklistColl)(_.findOne(MongoDBObject("_id" -> check)))) match {
        //   case Some(_) => emit(outFail, cmd)
        //   case None    => emit(outPass, cmd)
        // }
        // pull(reIn)
      }
      override def onUpstreamFinish(): Unit = {} // necessary for some reason!
    })
    setHandler(outPass, eagerTerminateOutput)
    setHandler(outFail, eagerTerminateOutput)
  }
}

输出:

BLACKLIST OK. jsmith@yahoo.com
Pulled reIn...
HERE IN SEND (TemplateRenderedExpression)!!!
ACK!
BLACKLIST OK. 919-919-9119
Pulled reIn...

您可以从输出中看到第一条记录顺利地流向 SEND/ACK 步骤。第二条记录打印了 BLACKLIST 消息,这意味着它发出了 outPass 然后调用了 pull on reIn ......但是下游没有任何反应。

有谁知道为什么这在 Akka Streams 中的工作方式与在 Casbah 版本中运行良好(显示的代码已注释掉)不同?

(我可以将 Mongo 调用转换为 Future 并等待它,这应该像旧代码一样工作,但这有点破坏了异步的全部意义!)

那么……"never mind"! :-)

上面的代码看起来应该可以工作。然后我注意到 Akka 的家伙刚刚发布了一个新版本 (2.0.1)。我不确定其中有哪些调整,但无论是什么,上面的代码现在都可以按照我希望的方式运行 w/o 修改。

留下这个 post 以防万一有人遇到类似的问题。