Akka-http 使用 Stream 处理请求
Akka-http process requests with Stream
我尝试编写一些简单的基于 akka-http 和 akka-streams 的应用程序来处理 http 请求,始终使用一个预编译流,因为我计划在我的 requestProcessor 流中使用带背压的长时间处理
我的申请代码:
import akka.actor.{ActorSystem, Props}
import akka.http.scaladsl._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server._
import akka.stream.ActorFlowMaterializer
import akka.stream.actor.ActorPublisher
import akka.stream.scaladsl.{Sink, Source}
import scala.annotation.tailrec
import scala.concurrent.Future
object UserRegisterSource {
def props: Props = Props[UserRegisterSource]
final case class RegisterUser(username: String)
}
class UserRegisterSource extends ActorPublisher[UserRegisterSource.RegisterUser] {
import UserRegisterSource._
import akka.stream.actor.ActorPublisherMessage._
val MaxBufferSize = 100
var buf = Vector.empty[RegisterUser]
override def receive: Receive = {
case request: RegisterUser =>
if (buf.isEmpty && totalDemand > 0)
onNext(request)
else {
buf :+= request
deliverBuf()
}
case Request(_) =>
deliverBuf()
case Cancel =>
context.stop(self)
}
@tailrec final def deliverBuf(): Unit =
if (totalDemand > 0) {
if (totalDemand <= Int.MaxValue) {
val (use, keep) = buf.splitAt(totalDemand.toInt)
buf = keep
use foreach onNext
} else {
val (use, keep) = buf.splitAt(Int.MaxValue)
buf = keep
use foreach onNext
deliverBuf()
}
}
}
object Main extends App {
val host = "127.0.0.1"
val port = 8094
implicit val system = ActorSystem("my-testing-system")
implicit val fm = ActorFlowMaterializer()
implicit val executionContext = system.dispatcher
val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] = Http(system).bind(interface = host, port = port)
val mySource = Source.actorPublisher[UserRegisterSource.RegisterUser](UserRegisterSource.props)
val requestProcessor = mySource
.mapAsync(1)(fakeSaveUserAndReturnCreatedUserId)
.to(Sink.head[Int])
.run()
val route: Route =
get {
path("test") {
parameter('test) { case t: String =>
requestProcessor ! UserRegisterSource.RegisterUser(t)
???
}
}
}
def fakeSaveUserAndReturnCreatedUserId(param: UserRegisterSource.RegisterUser): Future[Int] =
Future.successful {
1
}
serverSource.to(Sink.foreach {
connection =>
connection handleWith Route.handlerFlow(route)
}).run()
}
我找到了有关如何创建可以动态接受要处理的新项目的源的解决方案,但我可以找到任何有关如何在我的路线中获取流执行结果的解决方案
您问题的直接答案是为每个 HttpRequest 实现一个新的 Stream 并使用 Sink.head
来获取您要查找的值。修改您的代码:
val requestStream =
mySource.map(fakeSaveUserAndReturnCreatedUserId)
.to(Sink.head[Int])
//.run() - don't materialize here
val route: Route =
get {
path("test") {
parameter('test) { case t: String =>
//materialize a new Stream here
val userIdFut : Future[Int] = requestStream.run()
requestProcessor ! UserRegisterSource.RegisterUser(t)
//get the result of the Stream
userIdFut onSuccess { case userId : Int => ...}
}
}
}
但是,我认为你的问题是不恰当的。在您的代码示例中,您使用 akka Stream 的唯一目的是创建一个新的 UserId。 Futures 很容易解决这个问题,而不需要物化 Stream(以及所有伴随的 ):
val route: Route =
get {
path("test") {
parameter('test) { case t: String =>
val user = RegisterUser(t)
fakeSaveUserAndReturnCreatedUserId(user) onSuccess { case userId : Int =>
...
}
}
}
}
如果您想将并发调用的数量限制为 fakeSaveUserAndReturnCreateUserId
,那么您可以创建一个具有已定义线程池大小的 ExecutionContext
,如对 this question 的回答中所述,并且使用该 ExecutionContext 创建期货:
val ThreadCount = 10 //concurrent queries
val limitedExecutionContext =
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(ThreadCount))
def fakeSaveUserAndReturnCreatedUserId(param: UserRegisterSource.RegisterUser): Future[Int] =
Future { 1 }(limitedExecutionContext)
我尝试编写一些简单的基于 akka-http 和 akka-streams 的应用程序来处理 http 请求,始终使用一个预编译流,因为我计划在我的 requestProcessor 流中使用带背压的长时间处理
我的申请代码:
import akka.actor.{ActorSystem, Props}
import akka.http.scaladsl._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server._
import akka.stream.ActorFlowMaterializer
import akka.stream.actor.ActorPublisher
import akka.stream.scaladsl.{Sink, Source}
import scala.annotation.tailrec
import scala.concurrent.Future
object UserRegisterSource {
def props: Props = Props[UserRegisterSource]
final case class RegisterUser(username: String)
}
class UserRegisterSource extends ActorPublisher[UserRegisterSource.RegisterUser] {
import UserRegisterSource._
import akka.stream.actor.ActorPublisherMessage._
val MaxBufferSize = 100
var buf = Vector.empty[RegisterUser]
override def receive: Receive = {
case request: RegisterUser =>
if (buf.isEmpty && totalDemand > 0)
onNext(request)
else {
buf :+= request
deliverBuf()
}
case Request(_) =>
deliverBuf()
case Cancel =>
context.stop(self)
}
@tailrec final def deliverBuf(): Unit =
if (totalDemand > 0) {
if (totalDemand <= Int.MaxValue) {
val (use, keep) = buf.splitAt(totalDemand.toInt)
buf = keep
use foreach onNext
} else {
val (use, keep) = buf.splitAt(Int.MaxValue)
buf = keep
use foreach onNext
deliverBuf()
}
}
}
object Main extends App {
val host = "127.0.0.1"
val port = 8094
implicit val system = ActorSystem("my-testing-system")
implicit val fm = ActorFlowMaterializer()
implicit val executionContext = system.dispatcher
val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] = Http(system).bind(interface = host, port = port)
val mySource = Source.actorPublisher[UserRegisterSource.RegisterUser](UserRegisterSource.props)
val requestProcessor = mySource
.mapAsync(1)(fakeSaveUserAndReturnCreatedUserId)
.to(Sink.head[Int])
.run()
val route: Route =
get {
path("test") {
parameter('test) { case t: String =>
requestProcessor ! UserRegisterSource.RegisterUser(t)
???
}
}
}
def fakeSaveUserAndReturnCreatedUserId(param: UserRegisterSource.RegisterUser): Future[Int] =
Future.successful {
1
}
serverSource.to(Sink.foreach {
connection =>
connection handleWith Route.handlerFlow(route)
}).run()
}
我找到了有关如何创建可以动态接受要处理的新项目的源的解决方案,但我可以找到任何有关如何在我的路线中获取流执行结果的解决方案
您问题的直接答案是为每个 HttpRequest 实现一个新的 Stream 并使用 Sink.head
来获取您要查找的值。修改您的代码:
val requestStream =
mySource.map(fakeSaveUserAndReturnCreatedUserId)
.to(Sink.head[Int])
//.run() - don't materialize here
val route: Route =
get {
path("test") {
parameter('test) { case t: String =>
//materialize a new Stream here
val userIdFut : Future[Int] = requestStream.run()
requestProcessor ! UserRegisterSource.RegisterUser(t)
//get the result of the Stream
userIdFut onSuccess { case userId : Int => ...}
}
}
}
但是,我认为你的问题是不恰当的。在您的代码示例中,您使用 akka Stream 的唯一目的是创建一个新的 UserId。 Futures 很容易解决这个问题,而不需要物化 Stream(以及所有伴随的
val route: Route =
get {
path("test") {
parameter('test) { case t: String =>
val user = RegisterUser(t)
fakeSaveUserAndReturnCreatedUserId(user) onSuccess { case userId : Int =>
...
}
}
}
}
如果您想将并发调用的数量限制为 fakeSaveUserAndReturnCreateUserId
,那么您可以创建一个具有已定义线程池大小的 ExecutionContext
,如对 this question 的回答中所述,并且使用该 ExecutionContext 创建期货:
val ThreadCount = 10 //concurrent queries
val limitedExecutionContext =
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(ThreadCount))
def fakeSaveUserAndReturnCreatedUserId(param: UserRegisterSource.RegisterUser): Future[Int] =
Future { 1 }(limitedExecutionContext)