Actor与ExecutionContext的理解
Comprehension of Actor with ExecutionContext
据我了解 Akka 并行性,Actor 使用一个线程来处理每个传入消息。这个线程包含一个状态。事实上,顺序消息不共享此状态。
但是 Actor 可能有一个 ExecutorContext 用于执行来自 Future 的回调。这就是我不再清楚地理解并行性的地方。
例如我们有以下演员:
class AnyActor(target: ActorRef) extends Actor {
implicit val ec: ExecutionContext = context.dispatcher
def receive = {
case messageA =>
val api = createApi()
val furureA: Future[F] = api.callA
api.close()
futureA.pipeTo(sender())
case messageB =>
val api = createApi()
val furureB: Future[F] = api.callB
api.close()
futureB.pipeTo(sender())
}
}
假设,Actor 收到 messageA,Thread1 创建 api 的实例 - 让我们调用“api1”。还有带有 N 个线程的 executionContext。此线程之一用于从 furureA 检索结果。
我不明白的是,这 N 个线程如何与 Thread1 相关联。 ExecutionContext 仅为 Thread1 创建?或者它也是为Thread2创建的(在其中处理了messageB)?
广义地说,调度器上的参与者 运行 从池中选择一个线程,而参与者 Receive
的 运行 从邮箱中选择一些消息。通常无法保证 actor 将 运行 在给定线程上(忽略空洞的示例,例如具有单个线程的池,或者总是 运行 在特定线程中分配给定 actor 的调度程序)。
该调度程序也是 Scala ExecutionContext
,它允许安排任意任务在其线程池上执行;此类任务包括 Future
回调。
所以在您的 actor 中,当收到 messageA
时会发生什么?
- 演员调用
createApi()
并保存
- 它在
api
上调用 callA
方法
- 关闭
api
- 它安排在
callA
的结果可用时转发给发件人
- 它现在已准备好处理另一条消息,可能会也可能不会实际处理另一条消息
这实际上意味着什么取决于 callA
的作用。如果 callA
在执行上下文中调度一个任务,一旦任务被调度并且回调已经安排好,它就会 return 未来;无法保证在 returned 时执行了任务或回调。一旦 future 被 returned,你的 actor 就会关闭 api
(所以这可能发生在任务或回调执行的任何时候)。
简而言之,根据 api
的实现方式(您可能无法控制它的实现方式)和实现细节,可以采用以下顺序
- 线程1(处理
messageA
)在调度器中设置任务
- 线程 1 关闭
api
并安排结果通过管道传输
- 线程2开始执行任务
- 线程 1 继续处理其他消息
- Thread2 的任务失败,因为
api
已关闭
简而言之,当混合 Future
s 和 actor 时,Akka 中的“单线程幻觉”可以被打破:任意多个线程都可以操纵 actor 的状态。
在这个例子中,因为 Future
land 和 actorland 之间的唯一共享状态是本地处理单个消息,所以还不错:这里有效的一般规则是:
- 一旦您将可变(例如可关闭)状态从 actor 传递给未来(这包括,除非您可以绝对确定正在发生什么,否则会在 return 是未来的有状态对象上调用一个方法), 演员最好忘记那个物体的存在
然后如何关闭api
?
好吧,假设 callA
没有对 api
做任何奇怪的事情(比如将实例保存在一些实例池中),在 messageA
完成处理和未来已完成,没有任何内容可以访问 api
。因此,最简单也可能是最正确的做法是安排 api
在 future 完成后关闭,按照这些行
val api = createApi()
val futureA: Future[F] = api.callA
futureA.foreach { _ => api.close() }
futureA.pipeTo(sender())
据我了解 Akka 并行性,Actor 使用一个线程来处理每个传入消息。这个线程包含一个状态。事实上,顺序消息不共享此状态。
但是 Actor 可能有一个 ExecutorContext 用于执行来自 Future 的回调。这就是我不再清楚地理解并行性的地方。
例如我们有以下演员:
class AnyActor(target: ActorRef) extends Actor {
implicit val ec: ExecutionContext = context.dispatcher
def receive = {
case messageA =>
val api = createApi()
val furureA: Future[F] = api.callA
api.close()
futureA.pipeTo(sender())
case messageB =>
val api = createApi()
val furureB: Future[F] = api.callB
api.close()
futureB.pipeTo(sender())
}
}
假设,Actor 收到 messageA,Thread1 创建 api 的实例 - 让我们调用“api1”。还有带有 N 个线程的 executionContext。此线程之一用于从 furureA 检索结果。
我不明白的是,这 N 个线程如何与 Thread1 相关联。 ExecutionContext 仅为 Thread1 创建?或者它也是为Thread2创建的(在其中处理了messageB)?
广义地说,调度器上的参与者 运行 从池中选择一个线程,而参与者 Receive
的 运行 从邮箱中选择一些消息。通常无法保证 actor 将 运行 在给定线程上(忽略空洞的示例,例如具有单个线程的池,或者总是 运行 在特定线程中分配给定 actor 的调度程序)。
该调度程序也是 Scala ExecutionContext
,它允许安排任意任务在其线程池上执行;此类任务包括 Future
回调。
所以在您的 actor 中,当收到 messageA
时会发生什么?
- 演员调用
createApi()
并保存 - 它在
api
上调用 - 关闭
api
- 它安排在
callA
的结果可用时转发给发件人 - 它现在已准备好处理另一条消息,可能会也可能不会实际处理另一条消息
callA
方法
这实际上意味着什么取决于 callA
的作用。如果 callA
在执行上下文中调度一个任务,一旦任务被调度并且回调已经安排好,它就会 return 未来;无法保证在 returned 时执行了任务或回调。一旦 future 被 returned,你的 actor 就会关闭 api
(所以这可能发生在任务或回调执行的任何时候)。
简而言之,根据 api
的实现方式(您可能无法控制它的实现方式)和实现细节,可以采用以下顺序
- 线程1(处理
messageA
)在调度器中设置任务 - 线程 1 关闭
api
并安排结果通过管道传输 - 线程2开始执行任务
- 线程 1 继续处理其他消息
- Thread2 的任务失败,因为
api
已关闭
简而言之,当混合 Future
s 和 actor 时,Akka 中的“单线程幻觉”可以被打破:任意多个线程都可以操纵 actor 的状态。
在这个例子中,因为 Future
land 和 actorland 之间的唯一共享状态是本地处理单个消息,所以还不错:这里有效的一般规则是:
- 一旦您将可变(例如可关闭)状态从 actor 传递给未来(这包括,除非您可以绝对确定正在发生什么,否则会在 return 是未来的有状态对象上调用一个方法), 演员最好忘记那个物体的存在
然后如何关闭api
?
好吧,假设 callA
没有对 api
做任何奇怪的事情(比如将实例保存在一些实例池中),在 messageA
完成处理和未来已完成,没有任何内容可以访问 api
。因此,最简单也可能是最正确的做法是安排 api
在 future 完成后关闭,按照这些行
val api = createApi()
val futureA: Future[F] = api.callA
futureA.foreach { _ => api.close() }
futureA.pipeTo(sender())