http4s, Service Executor and Mongodb: How can I wait for insertOne to finish

提前为基本问题道歉。我开始使用 http4s 和路由器处理程序学习 Scala,我正在尝试输入 MongoDB 的条目。据我所知 insertOne returns a Observable[Completed].

知道如何在返回响应之前等待 observalbe 完成吗?


class Routes {
    val service: HttpService = HttpService {
        case r @ GET -> Root / "hello" => {
            val mongoClient: MongoClient = MongoClient()
            val database: MongoDatabase = mongoClient.getDatabase("scala")
            val collection: MongoCollection[Document] = database.getCollection("tests")
            val doc: Document = Document("_id" -> 0, "name" -> "MongoDB", "type" -> "database",
                                 "count" -> 1, "info" -> Document("x" -> 203, "y" -> 102))

class GomadApp(host: String, port: Int) {
  private val pool = Executors.newCachedThreadPool()

  println(s"Starting server on '$host:$port'")

  val routes = new Routes().service

  // Add some logging to the service
  val service: HttpService = routes.local { req =>
    val path = req.uri
    val start = System.nanoTime()
    val result = req
    val time = ((System.nanoTime() - start) / 1000) / 1000.0
    println(s"${req.remoteAddr.getOrElse("null")} -> ${req.method}: $path in $time ms")

  // Construct the blaze pipeline.
  def build(): ServerBuilder =
      .bindHttp(port, host)

object GomadApp extends ServerApp {
  val ip   = ""
  val port = envOrNone("HTTP_PORT") map (_.toInt) getOrElse (8787)

  override def server(args: List[String]): Task[Server] =
    new GomadApp(ip, port)


我推荐 - 虽然您必须分叉它并稍微增加依赖性,但 scalaz 7.1 和 7.2 不是二进制兼容的。


collection.insertOne(doc).toFuture().toTask.flatMap({res => Ok("Hello")})


This tweet made me wonder: Do you consider Futures "totally unusable" or is this just hyperbole? I've never had a major problem, but I'm willing to be enlightened. Doesn't the following code make Futures effectively "lazy"? def myFuture = Future { 42 } And, finally, I've also heard rumblings that scalaz's Tasks have some failings as well, but I haven't found much on it. Anybody have more details?


The fundamental problem is that constructing a Future with a side-effecting expression is itself a side-effect. You can only reason about Future for pure computations, which unfortunately is not how they are commonly used. Here is a demonstration of this operation breaking referential transparency:

import scala.concurrent.Future
import scala.util.Random

val f1 = { 
  val r = new Random(0L)
  val x = Future(r.nextInt)
  for { 
    a <- x
    b <- x
  } yield (a, b) 

// Same as f1, but I inlined `x`
val f2 = { 
  val r = new Random(0L)
  for { 
    a <- Future(r.nextInt)
    b <- Future(r.nextInt)
  } yield (a, b) 

f1.onComplete(println) // Success((-1155484576,-1155484576))
f2.onComplete(println) // Success((-1155484576,-723955400))    <-- not the same

However this works fine with Task. Note that the interesting one is the non-inlined version, which manages to produce two distinct Int values. This is the important bit: Task has a constructor that captures side-effects as values, and Future does not.

import scalaz.concurrent.Task

val task1 = { 
  val r = new Random(0L)
  val x = Task.delay(r.nextInt)
  for { 
    a <- x
    b <- x 
  } yield (a, b) 

// Same as task1, but I inlined `x`
val task2 = { 
  val r = new Random(0L)
  for { 
    a <- Task.delay(r.nextInt)
    b <- Task.delay(r.nextInt)
  } yield (a, b) 

println( // (-1155484576,-723955400)
println( // (-1155484576,-723955400)

Most of the commonly-cited differences like "a Task doesn't run until you ask it to" and "you can compose the same Task over and over" trace back to this fundamental distinction. So the reason it's "totally unusable" is that once you're used to programming with pure values and relying on equational reasoning to understand and manipulate programs it's hard to go back to side-effecty world where things are much harder to understand.