结合喷雾路由+演员模式匹配
Combining Spray Routing + Actor Pattern Matching
根据 Akka 集群文档,我有 Worker 拨入示例 运行。
http://doc.akka.io/docs/akka/snapshot/java/cluster-usage.html
所以我尝试将其与喷雾路由集成。
我的想法是在幕后建立一个集群,并通过 http rest 调用该服务。
所以我有以下代码。
object Boot extends App {
val port = if (args.isEmpty) "0" else args(0)
val config =
ConfigFactory
.parseString(s"akka.remote.netty.tcp.port=$port")
.withFallback(ConfigFactory.parseString("akka.cluster.roles = [frontend]"))
.withFallback(ConfigFactory.load())
val system = ActorSystem("ClusterSystem", config)
val frontend = system.actorOf(Props[TransformationFrontend], name = "frontend")
implicit val actSystem = ActorSystem()
IO(Http) ! Http.Bind(frontend, interface = config.getString("http.interface"), port = config.getInt("http.port"))
}
class TransformationFrontend extends Actor {
var backends = IndexedSeq.empty[ActorRef]
var jobCounter = 0
implicit val timeout = Timeout(5 seconds)
override def receive: Receive = {
case _: Http.Connected => sender ! Http.Register(self)
case HttpRequest(GET, Uri.Path("/job"), _, _, _) =>
jobCounter += 1
val backend = backends(jobCounter % backends.size)
val originalSender = sender()
val future : Future[TransformationResult] = (backend ? new TransformationJob(jobCounter + "-job")).mapTo[TransformationResult]
future onComplete {
case Success(s) =>
println("received from backend: " + s.text)
originalSender ! s.text
case Failure(f) => println("error found: " + f.getMessage)
}
case job: TransformationJob if backends.isEmpty =>
sender() ! JobFailed("Service unavailable, try again later", job)
case job: TransformationJob =>
jobCounter += 1
backends(jobCounter % backends.size) forward job
case BackendRegistration if !backends.contains(sender()) =>
println("backend registered")
context watch sender()
backends = backends :+ sender()
case Terminated(a) =>
backends = backends.filterNot(_ == a)
}
}
但我真正想做的是将喷射路由与那些模式匹配相结合。
与其像上面那样写我的 GET,我想这样写:
path("job") {
get {
respondWithMediaType(`application/json`) {
complete {
(backend ? new TransformationJob(jobCounter + "-job")).mapTo[TransformationResult]
}
}
}
}
但是用这个 class 扩展我的 Actor,我必须执行以下操作
def receive = runRoute(defaultRoute)
如何将此方法与我的 TransformationFrontend Actor 模式匹配方法结合起来? BackendRegistration、Terminated、TransformationJob?
您可以像 Receive
和 PartialFunction.orElse
组合 PartialFunction
:
class TransformationFrontend extends Actor {
// ...
def myReceive: Receive = {
case job: TransformationJob => // ...
// ...
}
def defaultRoute: Route =
get {
// ...
}
override def receive: Receive = runRoute(defaultRoute) orElse myReceive
}
也就是说,如果可能的话,将功能拆分为多个参与者(如上面评论中所建议的)通常是有意义的。
根据 Akka 集群文档,我有 Worker 拨入示例 运行。
http://doc.akka.io/docs/akka/snapshot/java/cluster-usage.html
所以我尝试将其与喷雾路由集成。
我的想法是在幕后建立一个集群,并通过 http rest 调用该服务。
所以我有以下代码。
object Boot extends App {
val port = if (args.isEmpty) "0" else args(0)
val config =
ConfigFactory
.parseString(s"akka.remote.netty.tcp.port=$port")
.withFallback(ConfigFactory.parseString("akka.cluster.roles = [frontend]"))
.withFallback(ConfigFactory.load())
val system = ActorSystem("ClusterSystem", config)
val frontend = system.actorOf(Props[TransformationFrontend], name = "frontend")
implicit val actSystem = ActorSystem()
IO(Http) ! Http.Bind(frontend, interface = config.getString("http.interface"), port = config.getInt("http.port"))
}
class TransformationFrontend extends Actor {
var backends = IndexedSeq.empty[ActorRef]
var jobCounter = 0
implicit val timeout = Timeout(5 seconds)
override def receive: Receive = {
case _: Http.Connected => sender ! Http.Register(self)
case HttpRequest(GET, Uri.Path("/job"), _, _, _) =>
jobCounter += 1
val backend = backends(jobCounter % backends.size)
val originalSender = sender()
val future : Future[TransformationResult] = (backend ? new TransformationJob(jobCounter + "-job")).mapTo[TransformationResult]
future onComplete {
case Success(s) =>
println("received from backend: " + s.text)
originalSender ! s.text
case Failure(f) => println("error found: " + f.getMessage)
}
case job: TransformationJob if backends.isEmpty =>
sender() ! JobFailed("Service unavailable, try again later", job)
case job: TransformationJob =>
jobCounter += 1
backends(jobCounter % backends.size) forward job
case BackendRegistration if !backends.contains(sender()) =>
println("backend registered")
context watch sender()
backends = backends :+ sender()
case Terminated(a) =>
backends = backends.filterNot(_ == a)
}
}
但我真正想做的是将喷射路由与那些模式匹配相结合。
与其像上面那样写我的 GET,我想这样写:
path("job") {
get {
respondWithMediaType(`application/json`) {
complete {
(backend ? new TransformationJob(jobCounter + "-job")).mapTo[TransformationResult]
}
}
}
}
但是用这个 class 扩展我的 Actor,我必须执行以下操作
def receive = runRoute(defaultRoute)
如何将此方法与我的 TransformationFrontend Actor 模式匹配方法结合起来? BackendRegistration、Terminated、TransformationJob?
您可以像 Receive
和 PartialFunction.orElse
组合 PartialFunction
:
class TransformationFrontend extends Actor {
// ...
def myReceive: Receive = {
case job: TransformationJob => // ...
// ...
}
def defaultRoute: Route =
get {
// ...
}
override def receive: Receive = runRoute(defaultRoute) orElse myReceive
}
也就是说,如果可能的话,将功能拆分为多个参与者(如上面评论中所建议的)通常是有意义的。