akka-http:将元素从 http 路由发送到 akka sink

akka-http: send element to akka sink from http route

如何从 Akka HTTP 路由发送 elements/messages 到 Akka Sink?我的 HTTP 路由仍然需要 return 正常的 HTTP 响应。

我想这需要一个流 branch/junction。正常的 HTTP 路由是来自 HttpRequest -> HttpResponse 的流。我想添加一个 branch/junction 以便 HttpRequests 可以触发事件到我单独的接收器并生成正常的 HttpResponse。

下面是一个非常简单的单路由 akka-http 应用程序。为简单起见,我使用了一个简单的 println 接收器。我的生产用例显然会涉及一个不那么琐碎的接收器。

def main(args: Array[String]): Unit = {
  implicit val actorSystem = ActorSystem("my-akka-http-test")
  val executor = actorSystem.dispatcher
  implicit val materializer = ActorMaterializer()(actorSystem)

  // I would like to send elements to this sink in response to HTTP GET operations.
  val sink: Sink[Any, Future[Done]] = Sink.foreach(println)

  val route: akka.http.scaladsl.server.Route =
    path("hello" / Segment) { p =>
      get {
        // I'd like to send a message to an Akka Sink as well as return an HTTP response.
        complete {
          s"<h1>Say hello to akka-http. p=$p</h1>"
        }
      }
    }

  val httpExt: akka.http.scaladsl.HttpExt = Http(actorSystem)
  val bindingFuture = httpExt.bindAndHandle(RouteResult.route2HandlerFlow(route), "localhost", 8080)

  println("Server online at http://localhost:8080/")
  println("Press RETURN to stop...")
  scala.io.StdIn.readLine()

  bindingFuture
    .flatMap(_.unbind())(executor) // trigger unbinding from the port
    .onComplete(_ => Await.result(actorSystem.terminate(), Duration.Inf))(executor) // and shutdown when done
}

编辑:或者在使用低级 akka-http API 时,我如何将特定消息从特定路由处理程序发送到接收器?

def main(args: Array[String]): Unit = {
  implicit val actorSystem = ActorSystem("my-akka-http-test")
  val executor = actorSystem.dispatcher
  implicit val materializer = ActorMaterializer()(actorSystem)

  // I would like to send elements to this sink in response to HTTP GET operations.
  val sink: Sink[Any, Future[Done]] = Sink.foreach(println)

  val requestHandler: HttpRequest => HttpResponse = {
    case HttpRequest(GET, Uri.Path("/"), _, _, _) =>
      HttpResponse(entity = HttpEntity(
        ContentTypes.`text/html(UTF-8)`,
        "<html><body>Hello world!</body></html>"))

    case HttpRequest(GET, Uri.Path("/ping"), _, _, _) =>
      HttpResponse(entity = "PONG!")

    case HttpRequest(GET, Uri.Path("/crash"), _, _, _) =>
      sys.error("BOOM!")

    case r: HttpRequest =>
      r.discardEntityBytes() // important to drain incoming HTTP Entity stream
      HttpResponse(404, entity = "Unknown resource!")
  }

  val serverSource = Http().bind(interface = "localhost", port = 8080)

  val bindingFuture: Future[Http.ServerBinding] =
    serverSource.to(Sink.foreach { connection =>
      println("Accepted new connection from " + connection.remoteAddress)

      connection handleWithSyncHandler requestHandler
      // this is equivalent to
      // connection handleWith { Flow[HttpRequest] map requestHandler }
    }).run()

  println("Server online at http://localhost:8080/")
  println("Press RETURN to stop...")
  scala.io.StdIn.readLine()

  bindingFuture
    .flatMap(_.unbind())(executor) // trigger unbinding from the port
    .onComplete(_ => Await.result(actorSystem.terminate(), Duration.Inf))(executor) // and shutdown when done
}

IF 你想将整个 HttpRequest 发送到你的接收器,我想说最简单的方法是使用 alsoTo 组合器.结果将类似于

val mySink: Sink[HttpRequest, NotUsed] = ???

val handlerFlow = Flow[HttpRequest].alsoTo(mySink).via(RouteResult.route2HandlerFlow(route))

val bindingFuture = Http().bindAndHandle(handlerFlow, "localhost", 8080)

仅供参考:alsoTo 实际上隐藏了一个 Broadcast 阶段。

IF 相反,您需要有选择地从特定子路由向 Sink 发送消息,您别无选择,只能为每个传入请求实现一个新流。请参阅下面的示例

val sink: Sink[Any, Future[Done]] = Sink.foreach(println)

val route: akka.http.scaladsl.server.Route =
  path("hello" / Segment) { p =>
    get {

      (extract(_.request) & extractMaterializer) { (req, mat) ⇒
        Source.single(req).runWith(sink)(mat)

        complete {
          s"<h1>Say hello to akka-http. p=$p</h1>"
        }
      }
    }
  }

此外,请记住,您始终可以完全放弃高级 DSL,并使用 lower-level streams DSL 为整个路线建模。这将导致更冗长的代码 - 但会让您完全控制流实现。

编辑:下面的例子

val sink: Sink[Any, Future[Done]] = Sink.foreach(println)

val handlerFlow =
  Flow.fromGraph(GraphDSL.create() { implicit b =>
    import GraphDSL.Implicits._

    val partition = b.add(Partition[HttpRequest](2, {
      case HttpRequest(GET, Uri.Path("/"), _, _, _) ⇒ 0
      case _                                        ⇒ 1
    }))
    val merge = b.add(Merge[HttpResponse](2))

    val happyPath = Flow[HttpRequest].map{ req ⇒
      HttpResponse(entity = HttpEntity(
        ContentTypes.`text/html(UTF-8)`,
        "<html><body>Hello world!</body></html>"))
    }        

    val unhappyPath = Flow[HttpRequest].map{
      case HttpRequest(GET, Uri.Path("/ping"), _, _, _) =>
      HttpResponse(entity = "PONG!")

      case HttpRequest(GET, Uri.Path("/crash"), _, _, _) =>
      sys.error("BOOM!")

      case r: HttpRequest =>
        r.discardEntityBytes() // important to drain incoming HTTP Entity stream
        HttpResponse(404, entity = "Unknown resource!")
    }

    partition.out(0).alsoTo(sink) ~> happyPath   ~> merge
    partition.out(1)              ~> unhappyPath ~> merge

    FlowShape(partition.in, merge.out)
  })

val bindingFuture = Http().bindAndHandle(handlerFlow, "localhost", 8080)

这是我使用的看起来很理想的解决方案。 Akka Http 似乎经过精心设计,因此您的路由是简单的 HttpRequest->HttpResponse 流,并且不涉及任何额外的分支。

我没有将所有内容都构建到单个 Akka 流图中,而是有一个单独的 QueueSource->Sink 图,而普通的 Akka Http HttpRequest->HttpResponse 流只是根据需要向源队列添加元素。

object HttpWithSinkTest {
  def buildQueueSourceGraph(): RunnableGraph[(SourceQueueWithComplete[String], Future[Done])] = {
    val annotateMessage: Flow[String, String, NotUsed] = Flow.fromFunction[String, String](s => s"got message from queue: $s")

    val sourceQueue = Source.queue[String](100, OverflowStrategy.dropNew)
    val sink: Sink[String, Future[Done]] = Sink.foreach(println)
    val annotatedSink = annotateMessage.toMat(sink)(Keep.right)
    val queueGraph = sourceQueue.toMat(annotatedSink)(Keep.both)

    queueGraph
  }

  def buildHttpFlow(queue: SourceQueueWithComplete[String],
                    actorSystem: ActorSystem, materializer: ActorMaterializer): Flow[HttpRequest, HttpResponse, NotUsed] = {
    implicit val actorSystemI = actorSystem
    implicit val materializerI = materializer

    val route: akka.http.scaladsl.server.Route =
      path("hello" / Segment) { p =>
        get {
          complete {
            queue.offer(s"got http event p=$p")

            s"<h1>Say hello to akka-http. p=$p</h1>"
          }
        }
      }

    val routeFlow = RouteResult.route2HandlerFlow(route)

    routeFlow
  }

  def main(args: Array[String]): Unit = {
    val actorSystem = ActorSystem("my-akka-http-test")
    val executor = actorSystem.dispatcher
    implicit val materializer = ActorMaterializer()(actorSystem)

    val (queue, _) = buildQueueSourceGraph().run()(materializer)

    val httpFlow = buildHttpFlow(queue, actorSystem, materializer)
    val httpExt: akka.http.scaladsl.HttpExt = Http(actorSystem)
    val bindingFuture = httpExt.bindAndHandle(httpFlow, "localhost", 8080)

    println("Server online at http://localhost:8080/")
    println("Press RETURN to stop...")
    scala.io.StdIn.readLine()

    println("Shutting down...")

    val serverBinding = Await.result(bindingFuture, Duration.Inf)
    Await.result(serverBinding.unbind(), Duration.Inf)
    Await.result(actorSystem.terminate(), Duration.Inf)

    println("Done. Exiting")
  }
}