Actor与ExecutionContext的理解

Comprehension of Actor with ExecutionContext

据我了解 Akka 并行性,Actor 使用一个线程来处理每个传入消息。这个线程包含一个状态。事实上,顺序消息不共享此状态。

但是 Actor 可能有一个 ExecutorContext 用于执行来自 Future 的回调。这就是我不再清楚地理解并行性的地方。

例如我们有以下演员:

  class AnyActor(target: ActorRef) extends Actor { 
          implicit val ec: ExecutionContext = context.dispatcher

          def receive = {
               case messageA =>
                    val api = createApi()
                    val furureA: Future[F] = api.callA 
                        api.close()
                    futureA.pipeTo(sender()) 
               case messageB =>
                    val api = createApi()
                    val furureB: Future[F] = api.callB 
                        api.close()
                    futureB.pipeTo(sender()) 
             }
  }

假设,Actor 收到 messageA,Thread1 创建 api 的实例 - 让我们调用“api1”。还有带有 N 个线程的 executionContext。此线程之一用于从 furureA 检索结果。

我不明白的是,这 N 个线程如何与 Thread1 相关联。 ExecutionContext 仅为 Thread1 创建?或者它也是为Thread2创建的(在其中处理了messageB)?

广义地说,调度器上的参与者 运行 从池中选择一个线程,而参与者 Receive 的 运行 从邮​​箱中选择一些消息。通常无法保证 actor 将 运行 在给定线程上(忽略空洞的示例,例如具有单个线程的池,或者总是 运行 在特定线程中分配给定 actor 的调度程序)。

该调度程序也是 Scala ExecutionContext,它允许安排任意任务在其线程池上执行;此类任务包括 Future 回调。

所以在您的 actor 中,当收到 messageA 时会发生什么?

  • 演员调用createApi()并保存
  • 它在 api
  • 上调用 callA 方法
  • 关闭api
  • 它安排在 callA 的结果可用时转发给发件人
  • 它现在已准备好处理另一条消息,可能会也可能不会实际处理另一条消息

这实际上意味着什么取决于 callA 的作用。如果 callA 在执行上下文中调度一个任务,一旦任务被调度并且回调已经安排好,它就会 return 未来;无法保证在 returned 时执行了任务或回调。一旦 future 被 returned,你的 actor 就会关闭 api(所以这可能发生在任务或回调执行的任何时候)。

简而言之,根据 api 的实现方式(您可能无法控制它的实现方式)和实现细节,可以采用以下顺序

  • 线程1(处理messageA)在调度器中设置任务
  • 线程 1 关闭 api 并安排结果通过管道传输
  • 线程2开始执行任务
  • 线程 1 继续处理其他消息
  • Thread2 的任务失败,因为 api 已关闭

简而言之,当混合 Futures 和 actor 时,Akka 中的“单线程幻觉”可以被打破:任意多个线程都可以操纵 actor 的状态。

在这个例子中,因为 Futureland 和 actorland 之间的唯一共享状态是本地处理单个消息,所以还不错:这里有效的一般规则是:

  • 一旦您将可变(例如可关闭)状态从 actor 传递给未来(这包括,除非您可以绝对确定正在发生什么,否则会在 return 是未来的有状态对象上调用一个方法), 演员最好忘记那个物体的存在

然后如何关闭api

好吧,假设 callA 没有对 api 做任何奇怪的事情(比如将实例保存在一些实例池中),在 messageA 完成处理和未来已完成,没有任何内容可以访问 api。因此,最简单也可能是最正确的做法是安排 api 在 future 完成后关闭,按照这些行

val api = createApi()
val futureA: Future[F] = api.callA

futureA.foreach { _ => api.close() }
futureA.pipeTo(sender())