如何响应演员调用的结果?
How to respond with the result of an actor call?
我们正在考虑使用 Akka-HTTP Java API - 使用路由 DSL。
不清楚如何使用路由功能来响应 HttpRequest;使用无类型的 Akka Actor。
例如,在匹配 Route 路径后,我们如何将请求传递给 "handler" ActorRef,后者将以异步方式响应 HttpResponse?
Akka-User 邮件列表上发布了一个类似的问题,但没有后续解决方案 - https://groups.google.com/d/msg/akka-user/qHe3Ko7EVvg/KC-aKz_o5aoJ。
这可以通过 onComplete
directive and the ask 模式的组合来实现。
在下面的示例中,RequestHandlerActor
actor 用于创建基于 HttpRequest
的 HttpResponse
。该 Actor 是从路线中询问的。
我从未使用 Java 作为路由代码,所以我的回复是在 Scala 中。
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.http.scaladsl.model.HttpResponse
import akka.http.scaladsl.model.HttpRequest
import akka.actor.Actor
import akka.http.scaladsl.server.Directives._
import akka.actor.Props
import akka.pattern.ask
import akka.util.Timeout
import scala.util.{Success, Failure}
import akka.http.scaladsl.model.StatusCodes.InternalServerError
class RequestHandlerActor extends Actor {
override def receive = {
case httpRequest : HttpRequest =>
sender() ! HttpResponse(entity = "actor responds nicely")
}
}
implicit val actorSystem = ActorSystem()
implicit val timeout = Timeout(5 seconds)
val requestRef = actorSystem actorOf Props[RequestHandlerActor]
val route =
extractRequest { request =>
onComplete((requestRef ? request).mapTo[HttpResponse]) {
case Success(response) => complete(response)
case Failure(ex) =>
complete((InternalServerError, s"Actor not playing nice: ${ex.getMessage}"))
}
}
然后可以像任何其他 Flow 一样将此路由传递给 bindAndHandle
方法。
我一直在寻找与问题作者描述的相同问题的解决方案。最后,我想出了以下 Java 创建路由的代码:
ActorRef ref = system.actorOf(Props.create(RequestHandlerActor.class));
return get(() -> route(
pathSingleSlash(() ->
extractRequest(httpRequest -> {
Timeout timeout = new Timeout(Duration.create(5, TimeUnit.SECONDS));
CompletionStage<HttpResponse> completionStage = PatternsCS.ask(ref, httpRequest, timeout)
.thenApplyAsync(HttpResponse.class::cast);
return completeWithFuture(completionStage);
})
))
);
而RequestHandlerActor
是:
public class RequestHandlerActor extends UntypedActor {
@Override
public void onReceive(Object msg) {
if (msg instanceof HttpRequest) {
HttpResponse httpResponse = HttpResponse.create()
.withEntity(ContentTypes.TEXT_HTML_UTF8,
"<html><body>Hello world!</body></html>");
getSender().tell(httpResponse, getSelf());
} else {
unhandled(msg);
}
}
}
我们正在考虑使用 Akka-HTTP Java API - 使用路由 DSL。
不清楚如何使用路由功能来响应 HttpRequest;使用无类型的 Akka Actor。 例如,在匹配 Route 路径后,我们如何将请求传递给 "handler" ActorRef,后者将以异步方式响应 HttpResponse?
Akka-User 邮件列表上发布了一个类似的问题,但没有后续解决方案 - https://groups.google.com/d/msg/akka-user/qHe3Ko7EVvg/KC-aKz_o5aoJ。
这可以通过 onComplete
directive and the ask 模式的组合来实现。
在下面的示例中,RequestHandlerActor
actor 用于创建基于 HttpRequest
的 HttpResponse
。该 Actor 是从路线中询问的。
我从未使用 Java 作为路由代码,所以我的回复是在 Scala 中。
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.http.scaladsl.model.HttpResponse
import akka.http.scaladsl.model.HttpRequest
import akka.actor.Actor
import akka.http.scaladsl.server.Directives._
import akka.actor.Props
import akka.pattern.ask
import akka.util.Timeout
import scala.util.{Success, Failure}
import akka.http.scaladsl.model.StatusCodes.InternalServerError
class RequestHandlerActor extends Actor {
override def receive = {
case httpRequest : HttpRequest =>
sender() ! HttpResponse(entity = "actor responds nicely")
}
}
implicit val actorSystem = ActorSystem()
implicit val timeout = Timeout(5 seconds)
val requestRef = actorSystem actorOf Props[RequestHandlerActor]
val route =
extractRequest { request =>
onComplete((requestRef ? request).mapTo[HttpResponse]) {
case Success(response) => complete(response)
case Failure(ex) =>
complete((InternalServerError, s"Actor not playing nice: ${ex.getMessage}"))
}
}
然后可以像任何其他 Flow 一样将此路由传递给 bindAndHandle
方法。
我一直在寻找与问题作者描述的相同问题的解决方案。最后,我想出了以下 Java 创建路由的代码:
ActorRef ref = system.actorOf(Props.create(RequestHandlerActor.class));
return get(() -> route(
pathSingleSlash(() ->
extractRequest(httpRequest -> {
Timeout timeout = new Timeout(Duration.create(5, TimeUnit.SECONDS));
CompletionStage<HttpResponse> completionStage = PatternsCS.ask(ref, httpRequest, timeout)
.thenApplyAsync(HttpResponse.class::cast);
return completeWithFuture(completionStage);
})
))
);
而RequestHandlerActor
是:
public class RequestHandlerActor extends UntypedActor {
@Override
public void onReceive(Object msg) {
if (msg instanceof HttpRequest) {
HttpResponse httpResponse = HttpResponse.create()
.withEntity(ContentTypes.TEXT_HTML_UTF8,
"<html><body>Hello world!</body></html>");
getSender().tell(httpResponse, getSelf());
} else {
unhandled(msg);
}
}
}