使用 Futures 的 actors 内的并发
Concurrency within actors using Futures
我想知道是否有更好的方法来处理 Actor 中值的异步初始化。 Actor 在 actor 内部时当然是线程安全的,但是使用 Futures 会带来麻烦(并且你必须确保你不会关闭超过 context
或 sender
)考虑以下几点:
class ExampleActor(ref1: ActorRef, ref2: ActorRef) extends Actor {
implicit val ec = context.dispatcher
val promise1 = Promise[Int]
val promise2 = Promise[Int]
def receive = {
case Request1.Response(x) => promise1.success(x)
case Request2.Response(y) => promise2.success(y)
case CombinedResponse(x, y) => x + y
}
promise1.future foreach { x =>
promise2.future foreach { y =>
self ! CombinedResponse(x, y)
}
}
ref1 ! Request1
ref2 ! Request2
}
是否有 better/more 惯用的方式来处理像这样的并行请求?
您实际上不需要期货来处理多部分响应:
var x: Option[Int] = None
var y: Option[Int] = None
def receive = {
case Request1.Response(x) => x = Some(x); checkParts
case Request2.Response(y) => y = Some(y); checkParts
}
def checkParts = for {
xx <- x
yy <- y
} parent ! xx + yy
顺便说一下,即使是 futures,你也可以用同样的方式使用 for-comprehension。
管理 actor 状态的更实用的方法:
case class Resp1(x: Int)
case class Resp2(y: Int)
case class State(x: Option[Int], y: Option[Int])
class Worker(parent: ActorRef) extends Actor {
def receive = process(State(None, None))
def process(s: State): Receive = edge(s) andThen { sn =>
context become process(sn)
for {
xx <- sn.x
yy <- sn.y
} parent ! xx + yy //action
}
def edge(s: State): PartialFunction[Any, State] = { //managing state
case Resp1(x) => s.copy(x = Some(x))
case Resp2(y) => s.copy(y = Some(y))
}
}
重用 actor 而不是创建 future 更好,因为 promise.success
实际上通过将任务提交给执行程序来产生不可管理的副作用,因此这不是一种纯粹的功能方式。 Actor 的状态更好,因为 actor 内部的副作用始终与其他 actor 的一致 - 它们是逐步应用的,并且仅响应某些消息。所以你可能会看到演员就像 fold
在无限集合上; actor 发送的状态和消息(也是无限的)可以看作是 fold 的累加器。
说到 Akka,它的 actor 有一些 IoC 特性,比如自动异常处理(通过监督),这在未来是不可用的。在您的情况下,您必须将 return 的附加复合消息引入到演员的 IoC 上下文中。添加除 self ! CombinedResponse(x, y)
之外的任何其他操作(例如,某些其他开发人员可能会不小心执行某些操作以实施某些解决方法)是不安全的。
我想知道是否有更好的方法来处理 Actor 中值的异步初始化。 Actor 在 actor 内部时当然是线程安全的,但是使用 Futures 会带来麻烦(并且你必须确保你不会关闭超过 context
或 sender
)考虑以下几点:
class ExampleActor(ref1: ActorRef, ref2: ActorRef) extends Actor {
implicit val ec = context.dispatcher
val promise1 = Promise[Int]
val promise2 = Promise[Int]
def receive = {
case Request1.Response(x) => promise1.success(x)
case Request2.Response(y) => promise2.success(y)
case CombinedResponse(x, y) => x + y
}
promise1.future foreach { x =>
promise2.future foreach { y =>
self ! CombinedResponse(x, y)
}
}
ref1 ! Request1
ref2 ! Request2
}
是否有 better/more 惯用的方式来处理像这样的并行请求?
您实际上不需要期货来处理多部分响应:
var x: Option[Int] = None
var y: Option[Int] = None
def receive = {
case Request1.Response(x) => x = Some(x); checkParts
case Request2.Response(y) => y = Some(y); checkParts
}
def checkParts = for {
xx <- x
yy <- y
} parent ! xx + yy
顺便说一下,即使是 futures,你也可以用同样的方式使用 for-comprehension。
管理 actor 状态的更实用的方法:
case class Resp1(x: Int)
case class Resp2(y: Int)
case class State(x: Option[Int], y: Option[Int])
class Worker(parent: ActorRef) extends Actor {
def receive = process(State(None, None))
def process(s: State): Receive = edge(s) andThen { sn =>
context become process(sn)
for {
xx <- sn.x
yy <- sn.y
} parent ! xx + yy //action
}
def edge(s: State): PartialFunction[Any, State] = { //managing state
case Resp1(x) => s.copy(x = Some(x))
case Resp2(y) => s.copy(y = Some(y))
}
}
重用 actor 而不是创建 future 更好,因为 promise.success
实际上通过将任务提交给执行程序来产生不可管理的副作用,因此这不是一种纯粹的功能方式。 Actor 的状态更好,因为 actor 内部的副作用始终与其他 actor 的一致 - 它们是逐步应用的,并且仅响应某些消息。所以你可能会看到演员就像 fold
在无限集合上; actor 发送的状态和消息(也是无限的)可以看作是 fold 的累加器。
说到 Akka,它的 actor 有一些 IoC 特性,比如自动异常处理(通过监督),这在未来是不可用的。在您的情况下,您必须将 return 的附加复合消息引入到演员的 IoC 上下文中。添加除 self ! CombinedResponse(x, y)
之外的任何其他操作(例如,某些其他开发人员可能会不小心执行某些操作以实施某些解决方法)是不安全的。