在开始主逻辑之前处理 Actor 需求

Handle Actor requirements before starting main logic

看下图:

在这个场景中,我们有两个经理。 ManagerA和ManagerB,都是由同一个父亲同时产生的。 ManagerB 也立即产生 3 children 开始处理一些数据。当工作人员完成处理并获得结果时,他将结果发回给 ManagerB。

ManagerB 然后通知 ManagerA 他有一个 object 的 B1 就绪。 ManagerA 然后告诉 A1、A2 和 A3 B1 已准备好并传递给他们。但是只有 A1 和 A2 需要 B1,所以他们保留它,A3 丢弃该消息。由于 A1 拥有 B1,因此他现在拥有开始执行其主要逻辑的所有要求,所以他这样做了。同时A2还需要B2,A3需要B3。

如何在 Scala 中使用 akka actor 实现这样的逻辑?我很难找到一种方法来以功能方式满足需求并在最终满足所有需求后开始执行。

我要使用的基本思想是在一个案例 class 中对状态进行编码,其中至少包含一组要求和一个映射,其键是要求的(非严格)子集,其值是 B 工作人员的结果,例如对于 A1,我们可能有:

requirements = Set("B1")
received = Map("B1" -> B1Result(...))

您的 actor 的行为是其状态的函数:改变状态的消息会改变行为。

更具体地说,您可以这样做:

object AWorker {
  type BResult = Any  // BResult would probably defined elsewhere, I'm aliasing Any for brevity

  type Action = (ActorContext[Command], Map[String, BResult]) => Unit

  sealed trait Command

  case class RequirementsAndDo(
    requirements: Set[String],
    action: Action,
    replyTo: Option[ActorRef[Reply]]
  ) extends Reply

  case class Completed(key: String, value: BResult, replyTo: Option[ActorRef[Reply]]) extends Reply

  sealed trait Reply
  case object Ack extends Reply

  def apply(): Behavior[Command] = withState(State())

  private[AWorker] case class State(
    requirements: Set[String] = Set.empty,
    received: Map[String, BResult] = Map.empty,
    whenRequirementsMet: Option[Action] = None
  ) {
    def updateRequirements(req: Set[String], f: Option[Action]): State = {
      // For now, start anew...
      State(req)
    }

    def completeRequirement(req: String, v: BResult): State =
      copy(received = received.updated(req, v))

    def checkAndPerform(ctx: ActorContext[Command]): State = {
      if (requirements.diff(received.keySet).isEmpty) {
        whenRequirementsMet.foreach { a =>
          a(ctx, received)
        }
        State()
      } else this
  }

  private def withState(state: State): Behavior[Command] =
    Behaviors.receive { (context, msg) =>
      msg match {
        case RequirementsAndDo(req, action, replyTo) =>
          val nextState = state.updateRequirements(req, action)
          replyTo.foreach(_ ! Ack)
          withState(nextState)
        case Completed(key, value, replyTo) =>
          val intermediateState = state.completeRequirement(key, value)
          replyTo.foreach(_ ! Ack)
          withState(intermediateState.checkAndPerform(context))
      }
    }
}

您可以通过 State return 效果元组和新状态上的方法使它更加 FP,但是这个 IMO 接近最佳实用功能。请特别注意,这鼓励将领域影响(例如围绕工作完成)与实现协议影响(例如回复)分开。