在 akka Actor 系统之外发送响应
Sending Response outside akka Actor System
我有一个 play(2.4.2 which has akka 2.4.18) 应用程序,我在其中使用 akka actor 上传文件。我有一个具有这种层次结构的父主管Actor
UploadSupervisor ---child---> UploadActor ---child--->
DataWriteActor & MetaWriteActor
叶演员 MetaWriteActor 和 DataWriteActor 进行实际的写作。我的代码的一个非常简化的版本如下:
首先我有一个演员监督:
class UploadSupervisor extends Actor {
val uploadActor = context.actorOf(Props(new UploadActor), "UploadActor")
override def supervisorStrategy = OneForOneStrategy() {
case _: Throwable => Restart
}
override def receive: Receive = {
case data: Data => uploadActor ! data
case meta: MetaInfo => uploadActor ! meta
//How do I send response outside of actor system?
case dataSuccess: DataUploadResponse => ??? //Line 10
case metaSuccess: MetaUploadResponse => ??? //Line 11
}
object UploadSupervisor {
val uploadSupervisor = Akka.system
.actorOf(Props(new UploadSupervisor), "UploadSupervisor")
}
//Request & Response case classes
case class Data(content: String)
case class MetaInfo(id: String, createdDate: Timestamp)
case class DataUploadResponse(location: String)
case class MetaUploadResponse(location: String)
上传演员:-
class UploadActor extends Actor {
val dataWriteActor = context.actorOf(Props(new DataWriteActor), "dataWriteActor")
val metaWriteActor = context.actorOf(Props(new MetaWriteActor), "UploadActor")
override def receive = {
case data: Data => dataWriteActor ! data
case meta: MetaInfo => metaWriteActor ! meta
case dataResp: DataUploadResponse => context.parent ! dataResp
case metaResp: MetaUploadResponse => context.parent ! metaResp
}
}
数据写入演员:
class DataWriteActor extends Actor {
case data: Data => //Do the writing
println("data write completed")
sender() ! DataUploadResponse("someLocation")
}
MetaWriteActor
class MetaWriteActor extends Actor {
case meta: MetaInfo=> //Do the writing
println(" meta info writing completed")
sender() ! MetaUploadResponse("someOtherLocation")
}
Actor 系统之外的某处:-
implicit val timeout = Timeout(10 seconds)
val f1 = UploadSupervisor.uploadSupervisor ? Data("Hello Akka").mapTo(implicitly[scala.reflect.ClassTag[DataUploadResponse]])
val f2 = UploadSupervisor.uploadSupervisor ? MetaInfo("1234", new Timestamp(new Date().getTime).mapTo(implicitly[scala.reflect.ClassTag[MetaUploadResponse]])
//Do something with futures
问题是如何将响应发送到actor系统之外?因为在第 10 和 11 行,我不能使用 sender ! msg 因为当前发件人是 UploadActor.
您可以保留 UploadSupervisor
对初始发件人的引用:
class UploadSupervisor extends Actor {
val uploadActor = context.actorOf(Props[UploadActor], "UploadActor")
override val supervisorStrategy = OneForOneStrategy() {
case _ => Restart
}
var dataSender: Option[ActorRef] = None
var metaSender: Option[ActorRef] = None
def receive = {
case data: Data =>
val s = sender
dataSender = Option(s)
uploadActor ! data
case meta: MetaInfo =>
val s = sender
metaSender = Option(s)
uploadActor ! meta
case dataSuccess: DataUploadResponse =>
dataSender.foreach(_ ! dataSuccess)
case metaSuccess: MetaUploadResponse =>
metaSender.foreach(_ ! metaSuccess)
}
}
发送消息给UploadSupervisor
:
implicit val timeout = Timeout(10 seconds)
val f1 = (UploadSupervisor.uploadSupervisor ? Data("Hello Akka")).mapTo[DataUploadResponse]
val f2 = (UploadSupervisor.uploadSupervisor ? MetaInfo("1234", new Timestamp(new Date().getTime)).mapTo[MetaUploadResponse]
以上假定您一次向 UploadSupervisor
发送一条 Data
消息和一条 MetaInfo
消息。如果您发送多个 Data
和 MetaInfo
消息并期望同时回复,则此方法将失效。一个更通用的解决方案是在包装现有案例 类 的附加案例 类 中包含对初始发件人的引用,通过您的参与者层次结构传递此引用:
case class DataMsg(data: Data, target: ActorRef)
case class MetaInfoMsg(metaInfo: MetaInfo, target: ActorRef)
case class DataUploadMsg(response: DataUploadResponse, target: ActorRef)
case class MetaUploadMsg(response: MetaUploadResponse, target: ActorRef)
class UploadSupervisor extends Actor {
val uploadActor = context.actorOf(Props[UploadActor], "UploadActor")
override val supervisorStrategy = OneForOneStrategy() {
case _ => Restart
}
def receive = {
case data: Data =>
val s = sender
uploadActor ! DataMsg(data, s)
case meta: MetaInfo =>
val s = sender
uploadActor ! MetaInfoMsg(meta, s)
case DataUploadMsg(response, target) =>
target ! response
case MetaUploadMsg(response, target) =>
target ! response
}
}
UploadActor
:
class UploadActor extends Actor {
val dataWriteActor = context.actorOf(Props[DataWriteActor], "dataWriteActor")
val metaWriteActor = context.actorOf(Props[MetaWriteActor], "UploadActor")
def receive = {
case data: DataMsg => dataWriteActor ! data
case meta: MetaInfoMsg => metaWriteActor ! meta
case dataResp: DataUploadMsg => context.parent ! dataResp
case metaResp: MetaUploadMsg => context.parent ! metaResp
}
}
作者:
class DataWriteActor extends Actor {
def receive = {
case DataMsg(data, target) =>
// do the writing
println("data write completed")
sender ! DataUploadMsg(DataUploadResponse("someLocation"), target)
}
}
class MetaWriteActor extends Actor {
def receive = {
case MetaInfoMsg(meta, target) =>
// do the writing
println("meta info writing completed")
sender ! MetaUploadMsg(MetaUploadResponse("someOtherLocation"), target)
}
}
我有一个 play(2.4.2 which has akka 2.4.18) 应用程序,我在其中使用 akka actor 上传文件。我有一个具有这种层次结构的父主管Actor
UploadSupervisor ---child---> UploadActor ---child--->
DataWriteActor & MetaWriteActor
叶演员 MetaWriteActor 和 DataWriteActor 进行实际的写作。我的代码的一个非常简化的版本如下:
首先我有一个演员监督:
class UploadSupervisor extends Actor {
val uploadActor = context.actorOf(Props(new UploadActor), "UploadActor")
override def supervisorStrategy = OneForOneStrategy() {
case _: Throwable => Restart
}
override def receive: Receive = {
case data: Data => uploadActor ! data
case meta: MetaInfo => uploadActor ! meta
//How do I send response outside of actor system?
case dataSuccess: DataUploadResponse => ??? //Line 10
case metaSuccess: MetaUploadResponse => ??? //Line 11
}
object UploadSupervisor {
val uploadSupervisor = Akka.system
.actorOf(Props(new UploadSupervisor), "UploadSupervisor")
}
//Request & Response case classes
case class Data(content: String)
case class MetaInfo(id: String, createdDate: Timestamp)
case class DataUploadResponse(location: String)
case class MetaUploadResponse(location: String)
上传演员:-
class UploadActor extends Actor {
val dataWriteActor = context.actorOf(Props(new DataWriteActor), "dataWriteActor")
val metaWriteActor = context.actorOf(Props(new MetaWriteActor), "UploadActor")
override def receive = {
case data: Data => dataWriteActor ! data
case meta: MetaInfo => metaWriteActor ! meta
case dataResp: DataUploadResponse => context.parent ! dataResp
case metaResp: MetaUploadResponse => context.parent ! metaResp
}
}
数据写入演员:
class DataWriteActor extends Actor {
case data: Data => //Do the writing
println("data write completed")
sender() ! DataUploadResponse("someLocation")
}
MetaWriteActor
class MetaWriteActor extends Actor {
case meta: MetaInfo=> //Do the writing
println(" meta info writing completed")
sender() ! MetaUploadResponse("someOtherLocation")
}
Actor 系统之外的某处:-
implicit val timeout = Timeout(10 seconds)
val f1 = UploadSupervisor.uploadSupervisor ? Data("Hello Akka").mapTo(implicitly[scala.reflect.ClassTag[DataUploadResponse]])
val f2 = UploadSupervisor.uploadSupervisor ? MetaInfo("1234", new Timestamp(new Date().getTime).mapTo(implicitly[scala.reflect.ClassTag[MetaUploadResponse]])
//Do something with futures
问题是如何将响应发送到actor系统之外?因为在第 10 和 11 行,我不能使用 sender ! msg 因为当前发件人是 UploadActor.
您可以保留 UploadSupervisor
对初始发件人的引用:
class UploadSupervisor extends Actor {
val uploadActor = context.actorOf(Props[UploadActor], "UploadActor")
override val supervisorStrategy = OneForOneStrategy() {
case _ => Restart
}
var dataSender: Option[ActorRef] = None
var metaSender: Option[ActorRef] = None
def receive = {
case data: Data =>
val s = sender
dataSender = Option(s)
uploadActor ! data
case meta: MetaInfo =>
val s = sender
metaSender = Option(s)
uploadActor ! meta
case dataSuccess: DataUploadResponse =>
dataSender.foreach(_ ! dataSuccess)
case metaSuccess: MetaUploadResponse =>
metaSender.foreach(_ ! metaSuccess)
}
}
发送消息给UploadSupervisor
:
implicit val timeout = Timeout(10 seconds)
val f1 = (UploadSupervisor.uploadSupervisor ? Data("Hello Akka")).mapTo[DataUploadResponse]
val f2 = (UploadSupervisor.uploadSupervisor ? MetaInfo("1234", new Timestamp(new Date().getTime)).mapTo[MetaUploadResponse]
以上假定您一次向 UploadSupervisor
发送一条 Data
消息和一条 MetaInfo
消息。如果您发送多个 Data
和 MetaInfo
消息并期望同时回复,则此方法将失效。一个更通用的解决方案是在包装现有案例 类 的附加案例 类 中包含对初始发件人的引用,通过您的参与者层次结构传递此引用:
case class DataMsg(data: Data, target: ActorRef)
case class MetaInfoMsg(metaInfo: MetaInfo, target: ActorRef)
case class DataUploadMsg(response: DataUploadResponse, target: ActorRef)
case class MetaUploadMsg(response: MetaUploadResponse, target: ActorRef)
class UploadSupervisor extends Actor {
val uploadActor = context.actorOf(Props[UploadActor], "UploadActor")
override val supervisorStrategy = OneForOneStrategy() {
case _ => Restart
}
def receive = {
case data: Data =>
val s = sender
uploadActor ! DataMsg(data, s)
case meta: MetaInfo =>
val s = sender
uploadActor ! MetaInfoMsg(meta, s)
case DataUploadMsg(response, target) =>
target ! response
case MetaUploadMsg(response, target) =>
target ! response
}
}
UploadActor
:
class UploadActor extends Actor {
val dataWriteActor = context.actorOf(Props[DataWriteActor], "dataWriteActor")
val metaWriteActor = context.actorOf(Props[MetaWriteActor], "UploadActor")
def receive = {
case data: DataMsg => dataWriteActor ! data
case meta: MetaInfoMsg => metaWriteActor ! meta
case dataResp: DataUploadMsg => context.parent ! dataResp
case metaResp: MetaUploadMsg => context.parent ! metaResp
}
}
作者:
class DataWriteActor extends Actor {
def receive = {
case DataMsg(data, target) =>
// do the writing
println("data write completed")
sender ! DataUploadMsg(DataUploadResponse("someLocation"), target)
}
}
class MetaWriteActor extends Actor {
def receive = {
case MetaInfoMsg(meta, target) =>
// do the writing
println("meta info writing completed")
sender ! MetaUploadMsg(MetaUploadResponse("someOtherLocation"), target)
}
}