使用 Futures 的 actors 内的并发

Concurrency within actors using Futures

我想知道是否有更好的方法来处理 Actor 中值的异步初始化。 Actor 在 actor 内部时当然是线程安全的,但是使用 Futures 会带来麻烦(并且你必须确保你不会关闭超过 contextsender)考虑以下几点:

class ExampleActor(ref1: ActorRef, ref2: ActorRef) extends Actor {

  implicit val ec = context.dispatcher

  val promise1 = Promise[Int]
  val promise2 = Promise[Int]

  def receive = {
    case Request1.Response(x) => promise1.success(x)
    case Request2.Response(y) => promise2.success(y)
    case CombinedResponse(x, y) => x + y
  }

  promise1.future foreach { x =>
    promise2.future foreach { y =>
      self ! CombinedResponse(x, y) 
    }
  }

  ref1 ! Request1
  ref2 ! Request2
}

是否有 better/more 惯用的方式来处理像这样的并行请求?

您实际上不需要期货来处理多部分响应:

var x: Option[Int] = None
var y: Option[Int] = None

def receive = {
  case Request1.Response(x) => x = Some(x); checkParts
  case Request2.Response(y) => y = Some(y); checkParts
}

def checkParts = for {
   xx <- x
   yy <- y
} parent ! xx + yy

顺便说一下,即使是 futures,你也可以用同样的方式使用 for-comprehension。

管理 actor 状态的更实用的方法:

case class Resp1(x: Int)
case class Resp2(y: Int)
case class State(x: Option[Int], y: Option[Int])

class Worker(parent: ActorRef) extends Actor {
  def receive = process(State(None, None))

  def process(s: State): Receive = edge(s) andThen { sn => 
    context become process(sn)
    for {
       xx <- sn.x
       yy <- sn.y
    } parent ! xx + yy //action
  }

  def edge(s: State): PartialFunction[Any, State] = { //managing state
    case Resp1(x) => s.copy(x = Some(x))
    case Resp2(y) => s.copy(y = Some(y))
  }

}

重用 actor 而不是创建 future 更好,因为 promise.success 实际上通过将任务提交给执行程序来产生不可管理的副作用,因此这不是一种纯粹的功能方式。 Actor 的状态更好,因为 actor 内部的副作用始终与其他 actor 的一致 - 它们是逐步应用的,并且仅响应某些消息。所以你可能会看到演员就像 fold 在无限集合上; actor 发送的状态和消息(也是无限的)可以看作是 fold 的累加器。

说到 Akka,它的 actor 有一些 IoC 特性,比如自动异常处理(通过监督),这在未来是不可用的。在您的情况下,您必须将 return 的附加复合消息引入到演员的 IoC 上下文中。添加除 self ! CombinedResponse(x, y) 之外的任何其他操作(例如,某些其他开发人员可能会不小心执行某些操作以实施某些解决方法)是不安全的。