喷涂路线得到儿童演员的回应
Spray route get response from child actor
我正在尝试弄清楚如何设置调用适当子项的 Master Actor,以支持我尝试模拟数据库调用的一些喷射路径。我是 akka / spray 的新手,所以只是想更好地了解如何正确设置 spray -> actors -> db 调用(等)。我可以从顶级演员那里得到回应,但是当我试图从父级以下的演员那里得到回应时,我似乎什么都做不了。
当查看 actor 的路径时,从我从临时 actor 传递的喷雾路线进行调用的方式看来。以下是我到目前为止用于解决这个问题的内容。这必须只是用户错误/无知,只是不确定如何进行。任何建议,将不胜感激。
下面的 Demo Spray Service 和 Redis Actor 代码片段显示了我从我的路线调用 actor 的位置以及我遇到问题的多个 actor(希望我的路线从 SummaryActor 获得响应)。谢谢!
开机:
object Boot extends App {
// we need an ActorSystem to host our application in
implicit val system = ActorSystem("on-spray-can")
// create and start our service actor
val service = system.actorOf(Props[DemoServiceActor], "demo-service")
implicit val timeout = Timeout(5.seconds)
// start a new HTTP server on port 8080 with our service actor as the handler
IO(Http) ? Http.Bind(service, interface = "localhost", port = 8080)
}
Demo Service Actor(喷)
class DemoServiceActor extends Actor with Api {
// the HttpService trait defines only one abstract member, which
// connects the services environment to the enclosing actor or test
def actorRefFactory = context
// this actor only runs our route, but you could add
// other things here, like request stream processing
// or timeout handling
def receive = handleTimeouts orElse runRoute(route)
//Used to watch for request timeouts
//http://spray.io/documentation/1.1.2/spray-routing/key-concepts/timeout-handling/
def handleTimeouts: Receive = {
case Timedout(x: HttpRequest) =>
sender ! HttpResponse(StatusCodes.InternalServerError, "Too late")
}
}
//Master trait for handling large APIs
//
trait Api extends DemoService {
val route = {
messageApiRouting
}
}
演示喷雾服务(路线):
trait DemoService extends HttpService with Actor {
implicit val timeout = Timeout(5 seconds) // needed for `?` below
val redisActor = context.actorOf(Props[RedisActor], "redisactor")
val messageApiRouting =
path("summary" / Segment / Segment) { (dataset, timeslice) =>
onComplete(getSummary(redisActor, dataset, timeslice)) {
case Success(value) => complete(s"The result was $value")
case Failure(ex) => complete(s"An error occurred: ${ex.getMessage}")
}
}
def getSummary(redisActor: ActorRef, dataset: String, timeslice: String): Future[String] = Future {
val dbMessage = DbMessage("summary", dataset + timeslice)
val future = redisActor ? dbMessage
val result = Await.result(future, timeout.duration).asInstanceOf[String]
result
}
}
Redis Actor(模拟没有实际的redis客户端)
class RedisActor extends Actor with ActorLogging {
// val pool = REDIS
implicit val timeout = Timeout(5 seconds) // needed for `?` below
val summaryActor = context.actorOf(Props[SummaryActor], "summaryactor")
def receive = {
case msg: DbMessage => {
msg.query match {
case "summary" => {
log.debug("Summary Query Request")
log.debug(sender.path.toString)
summaryActor ! msg
}
}
}
//If not match log an error
case _ => log.error("Received unknown message: {} ")
}
}
class SummaryActor extends Actor with ActorLogging{
def receive = {
case msg: DbMessage =>{
log.debug("Summary Actor Received Message")
//Send back to Spray Route
}
}
}
您的代码的第一个问题是您需要从主要参与者转发到 child,以便 sender
正确传播并可供 child 响应到。所以改变这个(在RedisActor
):
summaryActor ! msg
收件人:
summaryActor forward msg
这是首要问题。解决这个问题,您的代码应该开始工作了。不过还有其他需要注意的地方。您的 getSummary
方法当前定义为:
def getSummary(redisActor: ActorRef, dataset: String, timeslice: String): Future[String] =
Future {
val dbMessage = DbMessage("summary", dataset + timeslice)
val future = redisActor ? dbMessage
val result = Await.result(future, timeout.duration).asInstanceOf[String]
result
}
这里的问题是 ask
操作 (?
) 已经 return 是一个 Future
,所以你正在阻塞它以获得结果,将其包装在另一个 Future
中,这样您就可以 return 一个 Future
供 onComplete
使用。您应该能够通过直接使用 ask
中的 Future
return 来简化事情,如下所示:
def getSummary(redisActor: ActorRef, dataset: String, timeslice: String): Future[String] = {
val dbMessage = DbMessage("summary", dataset + timeslice)
(redisActor ? dbMessage).mapTo[String]
}
只是对上述方法的重要评论。
由于 getSummary(...) 函数 returns 一个 Future[String] 对象并且您在 onComplete(...) 函数中调用它,您需要导入:
进口ExecutionContext.Implicits.global
这样你就可以通过让 Future 在范围内拥有 ExecutionContext
声明一个隐式 ExecutionContext 参数。
** 如果你不这样做,你最终会得到一个不匹配的错误
因为 onComplete(...) 期望一个 onComplete 未来
磁铁对象,但你给了一个 Future[String] 对象。
我正在尝试弄清楚如何设置调用适当子项的 Master Actor,以支持我尝试模拟数据库调用的一些喷射路径。我是 akka / spray 的新手,所以只是想更好地了解如何正确设置 spray -> actors -> db 调用(等)。我可以从顶级演员那里得到回应,但是当我试图从父级以下的演员那里得到回应时,我似乎什么都做不了。
当查看 actor 的路径时,从我从临时 actor 传递的喷雾路线进行调用的方式看来。以下是我到目前为止用于解决这个问题的内容。这必须只是用户错误/无知,只是不确定如何进行。任何建议,将不胜感激。
下面的 Demo Spray Service 和 Redis Actor 代码片段显示了我从我的路线调用 actor 的位置以及我遇到问题的多个 actor(希望我的路线从 SummaryActor 获得响应)。谢谢!
开机:
object Boot extends App {
// we need an ActorSystem to host our application in
implicit val system = ActorSystem("on-spray-can")
// create and start our service actor
val service = system.actorOf(Props[DemoServiceActor], "demo-service")
implicit val timeout = Timeout(5.seconds)
// start a new HTTP server on port 8080 with our service actor as the handler
IO(Http) ? Http.Bind(service, interface = "localhost", port = 8080)
}
Demo Service Actor(喷)
class DemoServiceActor extends Actor with Api {
// the HttpService trait defines only one abstract member, which
// connects the services environment to the enclosing actor or test
def actorRefFactory = context
// this actor only runs our route, but you could add
// other things here, like request stream processing
// or timeout handling
def receive = handleTimeouts orElse runRoute(route)
//Used to watch for request timeouts
//http://spray.io/documentation/1.1.2/spray-routing/key-concepts/timeout-handling/
def handleTimeouts: Receive = {
case Timedout(x: HttpRequest) =>
sender ! HttpResponse(StatusCodes.InternalServerError, "Too late")
}
}
//Master trait for handling large APIs
//
trait Api extends DemoService {
val route = {
messageApiRouting
}
}
演示喷雾服务(路线):
trait DemoService extends HttpService with Actor {
implicit val timeout = Timeout(5 seconds) // needed for `?` below
val redisActor = context.actorOf(Props[RedisActor], "redisactor")
val messageApiRouting =
path("summary" / Segment / Segment) { (dataset, timeslice) =>
onComplete(getSummary(redisActor, dataset, timeslice)) {
case Success(value) => complete(s"The result was $value")
case Failure(ex) => complete(s"An error occurred: ${ex.getMessage}")
}
}
def getSummary(redisActor: ActorRef, dataset: String, timeslice: String): Future[String] = Future {
val dbMessage = DbMessage("summary", dataset + timeslice)
val future = redisActor ? dbMessage
val result = Await.result(future, timeout.duration).asInstanceOf[String]
result
}
}
Redis Actor(模拟没有实际的redis客户端)
class RedisActor extends Actor with ActorLogging {
// val pool = REDIS
implicit val timeout = Timeout(5 seconds) // needed for `?` below
val summaryActor = context.actorOf(Props[SummaryActor], "summaryactor")
def receive = {
case msg: DbMessage => {
msg.query match {
case "summary" => {
log.debug("Summary Query Request")
log.debug(sender.path.toString)
summaryActor ! msg
}
}
}
//If not match log an error
case _ => log.error("Received unknown message: {} ")
}
}
class SummaryActor extends Actor with ActorLogging{
def receive = {
case msg: DbMessage =>{
log.debug("Summary Actor Received Message")
//Send back to Spray Route
}
}
}
您的代码的第一个问题是您需要从主要参与者转发到 child,以便 sender
正确传播并可供 child 响应到。所以改变这个(在RedisActor
):
summaryActor ! msg
收件人:
summaryActor forward msg
这是首要问题。解决这个问题,您的代码应该开始工作了。不过还有其他需要注意的地方。您的 getSummary
方法当前定义为:
def getSummary(redisActor: ActorRef, dataset: String, timeslice: String): Future[String] =
Future {
val dbMessage = DbMessage("summary", dataset + timeslice)
val future = redisActor ? dbMessage
val result = Await.result(future, timeout.duration).asInstanceOf[String]
result
}
这里的问题是 ask
操作 (?
) 已经 return 是一个 Future
,所以你正在阻塞它以获得结果,将其包装在另一个 Future
中,这样您就可以 return 一个 Future
供 onComplete
使用。您应该能够通过直接使用 ask
中的 Future
return 来简化事情,如下所示:
def getSummary(redisActor: ActorRef, dataset: String, timeslice: String): Future[String] = {
val dbMessage = DbMessage("summary", dataset + timeslice)
(redisActor ? dbMessage).mapTo[String]
}
只是对上述方法的重要评论。
由于 getSummary(...) 函数 returns 一个 Future[String] 对象并且您在 onComplete(...) 函数中调用它,您需要导入:
进口ExecutionContext.Implicits.global
这样你就可以通过让 Future 在范围内拥有 ExecutionContext 声明一个隐式 ExecutionContext 参数。
** 如果你不这样做,你最终会得到一个不匹配的错误 因为 onComplete(...) 期望一个 onComplete 未来 磁铁对象,但你给了一个 Future[String] 对象。