在 akka Actor 系统之外发送响应

Sending Response outside akka Actor System

我有一个 play(2.4.2 which has akka 2.4.18) 应用程序,我在其中使用 akka actor 上传文件。我有一个具有这种层次结构的父主管Actor

UploadSupervisor ---child---> UploadActor ---child--->
DataWriteActor & MetaWriteActor

叶演员 MetaWriteActor 和 DataWriteActor 进行实际的写作。我的代码的一个非常简化的版本如下:

首先我有一个演员监督:

class UploadSupervisor extends Actor {
  val uploadActor = context.actorOf(Props(new UploadActor), "UploadActor") 
 override def supervisorStrategy = OneForOneStrategy() {
    case _: Throwable => Restart
 }

override def receive: Receive = {
  case data: Data => uploadActor ! data
  case meta: MetaInfo => uploadActor ! meta
  //How do I send response outside of actor system?
  case dataSuccess: DataUploadResponse => ??? //Line 10
  case metaSuccess: MetaUploadResponse => ??? //Line 11

}

object UploadSupervisor {
  val uploadSupervisor = Akka.system
    .actorOf(Props(new UploadSupervisor), "UploadSupervisor")
}
//Request & Response case classes
case class Data(content: String)
case class MetaInfo(id: String, createdDate: Timestamp)

case class DataUploadResponse(location: String)
case class MetaUploadResponse(location: String)

上传演员:-

class UploadActor extends Actor {  
val dataWriteActor = context.actorOf(Props(new DataWriteActor), "dataWriteActor")  
val metaWriteActor = context.actorOf(Props(new MetaWriteActor), "UploadActor")

override def receive = {   
case data: Data => dataWriteActor ! data   
case meta: MetaInfo => metaWriteActor ! meta   
case dataResp: DataUploadResponse => context.parent ! dataResp   
case metaResp: MetaUploadResponse => context.parent ! metaResp 

 }
}

数据写入演员:

class DataWriteActor extends Actor {
  case data: Data => //Do the writing 
                     println("data write completed")
                     sender() ! DataUploadResponse("someLocation")  

}

MetaWriteActor

class MetaWriteActor extends Actor {
  case meta: MetaInfo=> //Do the writing 
                     println(" meta info writing completed")
                     sender() ! MetaUploadResponse("someOtherLocation")  

}

Actor 系统之外的某处:-

implicit val timeout = Timeout(10 seconds)
val f1 = UploadSupervisor.uploadSupervisor ? Data("Hello Akka").mapTo(implicitly[scala.reflect.ClassTag[DataUploadResponse]])

val f2 = UploadSupervisor.uploadSupervisor ? MetaInfo("1234", new Timestamp(new Date().getTime).mapTo(implicitly[scala.reflect.ClassTag[MetaUploadResponse]])

//Do something with futures

问题是如何将响应发送到actor系统之外?因为在第 10 和 11 行,我不能使用 sender ! msg 因为当前发件人是 UploadActor.

您可以保留 UploadSupervisor 对初始发件人的引用:

class UploadSupervisor extends Actor {
  val uploadActor = context.actorOf(Props[UploadActor], "UploadActor")

  override val supervisorStrategy = OneForOneStrategy() {
    case _ => Restart
  }

  var dataSender: Option[ActorRef] = None
  var metaSender: Option[ActorRef] = None

  def receive = {
    case data: Data =>
      val s = sender
      dataSender = Option(s)
      uploadActor ! data
    case meta: MetaInfo =>
      val s = sender
      metaSender = Option(s)
      uploadActor ! meta
    case dataSuccess: DataUploadResponse =>
      dataSender.foreach(_ ! dataSuccess)
    case metaSuccess: MetaUploadResponse =>
      metaSender.foreach(_ ! metaSuccess)
  }
}

发送消息给UploadSupervisor:

implicit val timeout = Timeout(10 seconds)

val f1 = (UploadSupervisor.uploadSupervisor ? Data("Hello Akka")).mapTo[DataUploadResponse]

val f2 = (UploadSupervisor.uploadSupervisor ? MetaInfo("1234", new Timestamp(new Date().getTime)).mapTo[MetaUploadResponse]

以上假定您一次向 UploadSupervisor 发送一条 Data 消息和一条 MetaInfo 消息。如果您发送多个 DataMetaInfo 消息并期望同时回复,则此方法将失效。一个更通用的解决方案是在包装现有案例 类 的附加案例 类 中包含对初始发件人的引用,通过您的参与者层次结构传递此引用:

case class DataMsg(data: Data, target: ActorRef)
case class MetaInfoMsg(metaInfo: MetaInfo, target: ActorRef)

case class DataUploadMsg(response: DataUploadResponse, target: ActorRef)
case class MetaUploadMsg(response: MetaUploadResponse, target: ActorRef)

class UploadSupervisor extends Actor {
  val uploadActor = context.actorOf(Props[UploadActor], "UploadActor")

  override val supervisorStrategy = OneForOneStrategy() {
    case _ => Restart
  }

  def receive = {
    case data: Data =>
      val s = sender
      uploadActor ! DataMsg(data, s)
    case meta: MetaInfo =>
      val s = sender
      uploadActor ! MetaInfoMsg(meta, s)
    case DataUploadMsg(response, target) =>
      target ! response
    case MetaUploadMsg(response, target) =>
      target ! response
  }
}

UploadActor:

class UploadActor extends Actor {  
  val dataWriteActor = context.actorOf(Props[DataWriteActor], "dataWriteActor")  
  val metaWriteActor = context.actorOf(Props[MetaWriteActor], "UploadActor")

  def receive = {   
    case data: DataMsg => dataWriteActor ! data   
    case meta: MetaInfoMsg => metaWriteActor ! meta   
    case dataResp: DataUploadMsg => context.parent ! dataResp   
    case metaResp: MetaUploadMsg => context.parent ! metaResp 
  }
}

作者:

class DataWriteActor extends Actor {
  def receive = {
    case DataMsg(data, target) =>
      // do the writing 
      println("data write completed")
      sender ! DataUploadMsg(DataUploadResponse("someLocation"), target)
  }
}

class MetaWriteActor extends Actor {
  def receive = {
    case MetaInfoMsg(meta, target) =>
      // do the writing 
      println("meta info writing completed")
      sender ! MetaUploadMsg(MetaUploadResponse("someOtherLocation"), target) 
  }
}