从 Akka Actor 获取消息
Get a message out of an Akka Actor
我构建了一个定期查询 API 的 Akka actor,如下所示:
val cancellable =
system.scheduler.schedule(0 milliseconds,
5 seconds,
actor,
QueryController(1))
Actor,本质上是:
object UpdateStatistics {
/**
* Query the controller for the given switch Id
*
* @param dpId Switch's Id
*/
case class QueryController(dpId: Int)
case object Stop
def props: Props = Props[UpdateStatistics]
}
class UpdateStatistics extends Actor with akka.actor.ActorLogging {
import UpdateStatistics._
def receive = {
case QueryController(id) =>
import context.dispatcher
log.info(s"Receiving request to query controller")
Future { FlowCollector.getSwitchFlows(1) } onComplete {
f => self ! f.get
}
case Stop =>
log.info(s"Shuting down")
context stop self
case json: JValue =>
log.info("Getting json response, computing features...")
val features = FeatureExtractor.getFeatures(json)
log.debug(s"Features: $features")
sender ! features
case x =>
log.warning("Received unknown message: {}", x)
}
}
我想做的是从 UpdateStatistics
演员那里获取 json:Jvalue
消息。阅读 Akka docs 我认为这可能有用:
implicit val i = inbox()
i.select() {
case x => println(s"Valor Devuelto $x")
}
println(i receive(2.second))
但我不知道如何修改UpdateStatistics
演员才能将结果发送到上面的收件箱。
我在文档中读到的另一个选项是 event streams。
但我认为这不是正确的方法。
有没有办法实现我想做的事情?或者我是否需要使用第二个 Actor
来发送 JSON
响应?
您可能正在寻找 AKKA 中的 ask
模式。这将允许您 return 向发件人发送一个值。
import akka.pattern.ask
import akka.util.duration._
implicit val timeout = Timeout(5 seconds)
val future = actor ? QueryController(1)
val result = Await.result(future, timeout.duration).asInstanceOf[JValue]
println(result)
要完成这项工作,您需要将响应发送到原始 sender
,而不是 self
。此外,您应该注意将来在处理消息时关闭 sender
的危险。
我构建了一个定期查询 API 的 Akka actor,如下所示:
val cancellable =
system.scheduler.schedule(0 milliseconds,
5 seconds,
actor,
QueryController(1))
Actor,本质上是:
object UpdateStatistics {
/**
* Query the controller for the given switch Id
*
* @param dpId Switch's Id
*/
case class QueryController(dpId: Int)
case object Stop
def props: Props = Props[UpdateStatistics]
}
class UpdateStatistics extends Actor with akka.actor.ActorLogging {
import UpdateStatistics._
def receive = {
case QueryController(id) =>
import context.dispatcher
log.info(s"Receiving request to query controller")
Future { FlowCollector.getSwitchFlows(1) } onComplete {
f => self ! f.get
}
case Stop =>
log.info(s"Shuting down")
context stop self
case json: JValue =>
log.info("Getting json response, computing features...")
val features = FeatureExtractor.getFeatures(json)
log.debug(s"Features: $features")
sender ! features
case x =>
log.warning("Received unknown message: {}", x)
}
}
我想做的是从 UpdateStatistics
演员那里获取 json:Jvalue
消息。阅读 Akka docs 我认为这可能有用:
implicit val i = inbox()
i.select() {
case x => println(s"Valor Devuelto $x")
}
println(i receive(2.second))
但我不知道如何修改UpdateStatistics
演员才能将结果发送到上面的收件箱。
我在文档中读到的另一个选项是 event streams。
但我认为这不是正确的方法。
有没有办法实现我想做的事情?或者我是否需要使用第二个 Actor
来发送 JSON
响应?
您可能正在寻找 AKKA 中的 ask
模式。这将允许您 return 向发件人发送一个值。
import akka.pattern.ask
import akka.util.duration._
implicit val timeout = Timeout(5 seconds)
val future = actor ? QueryController(1)
val result = Await.result(future, timeout.duration).asInstanceOf[JValue]
println(result)
要完成这项工作,您需要将响应发送到原始 sender
,而不是 self
。此外,您应该注意将来在处理消息时关闭 sender
的危险。