akka 向远程 actor 发送闭包 + 在 Scala Akka 远程 actor 之间发送消息时出错(案例 class)
akka sending a closure to remote actor + Error in sending messages (case class) between Scala Akka remote actors
新查询:我正在尝试将 DSum() 作为参数从 localActor 传递给 RemoteActor,DSum() 将在远程节点进行一些计算。我无法将其发送给 RemoteActor。有可能吗??(下面的代码)
完成:我正在尝试连接远程 actor 和本地 actor,并尝试使用案例 class 发送对象,但无法获取消息 class(Common.Message (msg) ) 从 localActor 调用时,它会收到 "case _ => println("Received unknown msg from local ")"
1.package.scala
package object check {
trait Context
case object Start
case class Message(msg: String)
case class CxtDA(cxtA: List[CxtA])
case class RCxt(var cxtA: List[CxtA], var cxtB: List[CxtB], var v1: Int, var v2: String) extends Context
case class CxtA(var cxtC: List[CxtC], var v1: Int) extends Context
case class CxtB(var cxtC: List[CxtC], var v1: Int) extends Context
case class CxtC(var v1: String, var v2: Int) extends Context
case class Task(var t1: DSum())
}
2. Remote Actor
package com.akka.remote
import java.io.File
import akka.actor._
import com.typesafe.config.ConfigFactory
import check._
/**
* Remote actor which listens on port 5150
*/
class RemoteActor extends Actor {
override def toString: String = {
return "You printed the Local";
}
def receive = {
case msg: String => {
println("remote received " + msg + " from " + sender)
sender ! "hi"
}
case Message(msg) =>
println("RemoteActor received message "+ msg)
sender ! Message("Hello from server")
case CxtDA(cxtA) =>
println("cxtA "+ cxtA)
case Task(taskA) =>
println ("recieved closure")
case _ => println("unknown msg")
}
}
object RemoteActor{
def main(args: Array[String]) {
//get the configuration file from classpath
val configFile = getClass.getClassLoader.getResource("remote_application.conf").getFile
// //parse the config
val config = ConfigFactory.parseFile(new File(configFile))
// //create an actor system with that config
val system = ActorSystem("RemoteSystem" , config)
// //create a remote actor from actorSystem
val remoteActor = system.actorOf(Props[RemoteActor], name="remote")
println("remote is ready")
remoteActor ! Message("Hello from active remote")
}
}
3.Local Actor
package com.akka.local
import java.io.File
import akka.actor.{Props, Actor, ActorSystem}
import com.typesafe.config.ConfigFactory
import check._
import scala.util.Random
/**
* Local actor which listens on any free port
*/
trait CxtTask {
type CxtT <: Context
def work(ctx: CxtT): CxtT
}
class DSum extends CxtTask with Serializable{
override type CxtT = CxtA
def work(ctx: CxtA): CxtA = {
val sum = ctx.cxtC.foldRight(0)((v, acc) => v.v2 + acc)
ctx.cxtC= List()
ctx.v1 = sum
println("ctx: " + ctx)
ctx
}
}
class LocalActor extends Actor{
// import Common._
@throws[Exception](classOf[Exception])
val remoteActor = context.actorSelection("akka.tcp://RemoteSystem@127.0.0.1:5150/user/remote")
println("That 's remote:" + remoteActor)
remoteActor ! "hi"
var counter = 0
override def toString: String = {
return "You printed the Local";
}
def receive = {
case msg:String => {
println("got message from remote" + msg)
}
case Start =>
println("inside Start.local "+ remoteActor)
remoteActor ! Message("Hello from the LocalActor")
case Message(msg) =>
println("LocalActor received message: "+ msg)
if (counter < 5) {
sender ! Message("Hello back to you")
counter += 1
}
case CxtDA(cxtA) =>
remoteActor ! CxtDA(cxtA)
case Task(t1) =>
remoteActor ! Task(t1)
}
}
object LocalActor {
def main(args: Array[String]) {
val configFile = getClass.getClassLoader.getResource("local_application.conf").getFile
val config = ConfigFactory.parseFile(new File(configFile))
val system = ActorSystem("ClientSystem",config)
val localActor = system.actorOf(Props[LocalActor], name="local")
localActor ! Start
def createRndCxtC(count: Int):List[CxtC] = (for (i <- 1 to count) yield CxtC(Random.nextString(5), 3)).toList
def createRndCxtB(count: Int): List[CxtB] = (for (i <- 1 to count) yield CxtB(createRndCxtC(count), Random.nextInt())).toList
def createRndCxtA(count: Int): List[CxtA] = (for (i <- 1 to count) yield CxtA(createRndCxtC(count), Random.nextInt())).toList
val tree = RCxt(createRndCxtA(2),createRndCxtB(2),1,"")
val workA = new DSum()
tree.cxtA.foreach(ctxa =>workA.work(ctxa))
localActor ! Task(new DSum())
}
}
[Remote actor output][1]
[1]: https://i.stack.imgur.com/mtmvU.jpg
这里的关键是你为每个参与者定义了两个不同的协议:
Common
驻留在 RemoteActor.scala
文件中的对象
Common
驻留在 LocalActor.scala
文件中的对象
因此,当在本地 Actor 中发送 Common.Message
时,您基本上是在创建一条与来自远程 Actor 的 Common.Message
类型不同的消息。因此,演员无法处理它。
作为 Akka 中的一个好习惯,只要一个 actor 有一个特定的消息协议,就应该在它的伴随对象中定义。但是,如果您有多个共享同一协议的参与者(它们的行为是通过处理这些类型的消息来定义的),那么您应该将该协议放入一个对象中并从您的参与者导入它。
希望对您有所帮助。
新查询:我正在尝试将 DSum() 作为参数从 localActor 传递给 RemoteActor,DSum() 将在远程节点进行一些计算。我无法将其发送给 RemoteActor。有可能吗??(下面的代码)
完成:我正在尝试连接远程 actor 和本地 actor,并尝试使用案例 class 发送对象,但无法获取消息 class(Common.Message (msg) ) 从 localActor 调用时,它会收到 "case _ => println("Received unknown msg from local ")"
1.package.scala
package object check {
trait Context
case object Start
case class Message(msg: String)
case class CxtDA(cxtA: List[CxtA])
case class RCxt(var cxtA: List[CxtA], var cxtB: List[CxtB], var v1: Int, var v2: String) extends Context
case class CxtA(var cxtC: List[CxtC], var v1: Int) extends Context
case class CxtB(var cxtC: List[CxtC], var v1: Int) extends Context
case class CxtC(var v1: String, var v2: Int) extends Context
case class Task(var t1: DSum())
}
2. Remote Actor
package com.akka.remote
import java.io.File
import akka.actor._
import com.typesafe.config.ConfigFactory
import check._
/**
* Remote actor which listens on port 5150
*/
class RemoteActor extends Actor {
override def toString: String = {
return "You printed the Local";
}
def receive = {
case msg: String => {
println("remote received " + msg + " from " + sender)
sender ! "hi"
}
case Message(msg) =>
println("RemoteActor received message "+ msg)
sender ! Message("Hello from server")
case CxtDA(cxtA) =>
println("cxtA "+ cxtA)
case Task(taskA) =>
println ("recieved closure")
case _ => println("unknown msg")
}
}
object RemoteActor{
def main(args: Array[String]) {
//get the configuration file from classpath
val configFile = getClass.getClassLoader.getResource("remote_application.conf").getFile
// //parse the config
val config = ConfigFactory.parseFile(new File(configFile))
// //create an actor system with that config
val system = ActorSystem("RemoteSystem" , config)
// //create a remote actor from actorSystem
val remoteActor = system.actorOf(Props[RemoteActor], name="remote")
println("remote is ready")
remoteActor ! Message("Hello from active remote")
}
}
3.Local Actor
package com.akka.local
import java.io.File
import akka.actor.{Props, Actor, ActorSystem}
import com.typesafe.config.ConfigFactory
import check._
import scala.util.Random
/**
* Local actor which listens on any free port
*/
trait CxtTask {
type CxtT <: Context
def work(ctx: CxtT): CxtT
}
class DSum extends CxtTask with Serializable{
override type CxtT = CxtA
def work(ctx: CxtA): CxtA = {
val sum = ctx.cxtC.foldRight(0)((v, acc) => v.v2 + acc)
ctx.cxtC= List()
ctx.v1 = sum
println("ctx: " + ctx)
ctx
}
}
class LocalActor extends Actor{
// import Common._
@throws[Exception](classOf[Exception])
val remoteActor = context.actorSelection("akka.tcp://RemoteSystem@127.0.0.1:5150/user/remote")
println("That 's remote:" + remoteActor)
remoteActor ! "hi"
var counter = 0
override def toString: String = {
return "You printed the Local";
}
def receive = {
case msg:String => {
println("got message from remote" + msg)
}
case Start =>
println("inside Start.local "+ remoteActor)
remoteActor ! Message("Hello from the LocalActor")
case Message(msg) =>
println("LocalActor received message: "+ msg)
if (counter < 5) {
sender ! Message("Hello back to you")
counter += 1
}
case CxtDA(cxtA) =>
remoteActor ! CxtDA(cxtA)
case Task(t1) =>
remoteActor ! Task(t1)
}
}
object LocalActor {
def main(args: Array[String]) {
val configFile = getClass.getClassLoader.getResource("local_application.conf").getFile
val config = ConfigFactory.parseFile(new File(configFile))
val system = ActorSystem("ClientSystem",config)
val localActor = system.actorOf(Props[LocalActor], name="local")
localActor ! Start
def createRndCxtC(count: Int):List[CxtC] = (for (i <- 1 to count) yield CxtC(Random.nextString(5), 3)).toList
def createRndCxtB(count: Int): List[CxtB] = (for (i <- 1 to count) yield CxtB(createRndCxtC(count), Random.nextInt())).toList
def createRndCxtA(count: Int): List[CxtA] = (for (i <- 1 to count) yield CxtA(createRndCxtC(count), Random.nextInt())).toList
val tree = RCxt(createRndCxtA(2),createRndCxtB(2),1,"")
val workA = new DSum()
tree.cxtA.foreach(ctxa =>workA.work(ctxa))
localActor ! Task(new DSum())
}
}
[Remote actor output][1]
[1]: https://i.stack.imgur.com/mtmvU.jpg
这里的关键是你为每个参与者定义了两个不同的协议:
Common
驻留在RemoteActor.scala
文件中的对象Common
驻留在LocalActor.scala
文件中的对象
因此,当在本地 Actor 中发送 Common.Message
时,您基本上是在创建一条与来自远程 Actor 的 Common.Message
类型不同的消息。因此,演员无法处理它。
作为 Akka 中的一个好习惯,只要一个 actor 有一个特定的消息协议,就应该在它的伴随对象中定义。但是,如果您有多个共享同一协议的参与者(它们的行为是通过处理这些类型的消息来定义的),那么您应该将该协议放入一个对象中并从您的参与者导入它。
希望对您有所帮助。