akka:组合来自多个 children 的消息的模式
akka: pattern for combining messages from multiple children
这是我遇到的模式:
一个演员 A
有多个 children C1
, ..., Cn
。收到消息后,A
将其发送给它的每个 children,每个 children 都对消息进行一些计算,并在完成后将其发送回 A
。 A
然后想合并所有 children 的结果以传递给另一个演员。
这个问题的解决方案是什么样的?或者这是 anti-pattern?在什么情况下应该如何处理这个问题?
这是一个简单的例子,希望能说明我当前的解决方案。我担心的是重复代码(直到对称);不能很好地扩展到 children 的 'lots';并且很难看清发生了什么。
import akka.actor.{Props, Actor}
case class Tagged[T](value: T, id: Int)
class A extends Actor {
import C1._
import C2._
val c1 = context.actorOf(Props[C1], "C1")
val c2 = context.actorOf(Props[C2], "C2")
var uid = 0
var c1Results = Map[Int, Int]()
var c2Results = Map[Int, Int]()
def receive = {
case n: Int => {
c1 ! Tagged(n, uid)
c2 ! Tagged(n, uid)
uid += 1
}
case Tagged(C1Result(n), id) => c2Results get id match {
case None => c1Results += (id -> n)
case Some(m) => {
c2Results -= id
context.parent ! (n, m)
}
}
case Tagged(C2Result(n), id) => c1Results get id match {
case None => c2Results += (id -> n)
case Some(m) => {
c1Results -= id
context.parent ! (m, n)
}
}
}
}
class C1 extends Actor {
import C1._
def receive = {
case Tagged(n: Int, id) => Tagged(C1Result(n), id)
}
}
object C1 {
case class C1Result(n: Int)
}
class C2 extends Actor {
import C2._
def receive = {
case Tagged(n: Int, id) => Tagged(C2Result(n), id)
}
}
object C2 {
case class C2Result(n: Int)
}
如果你认为代码看起来像 god-awful,请放轻松,我才刚刚开始学习 akka ;)
您可以在 child 演员上使用 ?
而不是 !
- 这将导致 child 演员 return a Future
与他们的(最终)结果,即一切仍然 non-blocking 直到你 Await
Future
的结果。 parent actor 然后可以编写这些 Futures
并将其发送给另一个 actor - 它已经知道每个 Future's
身份,因此您无需担心标记每条消息,以便你可以稍后把它们放回原位。这是一个简单的示例,其中每个 child return 是一个随机的 Double
,并且您想将第一个 child 的 return 值除以第二个 [=22] =] 的 return 值(即顺序很重要)。
import scala.concurrent.duration._
import akka.actor.{Props, Actor}
import akka.pattern.{ask, pipe}
import akka.util.Timeout
class A extends Actor {
val c1 = context.actorOf(Props[C], "C1")
val c2 = context.actorOf(Props[C], "C2")
// The ask operation involves creating an internal actor for handling
// this reply, which needs to have a timeout after which it is
// destroyed in order not to leak resources; see more below.
implicit val timeout = Timeout(5 seconds)
def receive = {
case _ => {
val f1 = c1 ? "anything" // Future[Any]
val f2 = c2 ? "anything" // Future[Any]
val result: Future[Double] = for {
d1 <- f1.mapTo[Double]
d2 <- f2.mapTo[Double]
} yield d1 / d2
}
}
class C extends Actor {
def receive = {
case _ => // random Double
}
}
如果有许多 - 或数量不定的 - child 演员,Zim-Zam 建议的 很快就会失控。
aggregator pattern 旨在帮助解决这种情况。它提供了一个 Aggregator 特性,您可以在 actor 中使用它来执行您的聚合逻辑。
想要执行聚合的客户端 actor 可以启动一个基于聚合器的 actor 实例并向它发送一条消息,以启动聚合过程。
应该为每个聚合操作创建一个新的聚合器,并在发回结果时终止(当它收到所有响应或超时时)。
下面列出了此模式的一个示例,用于对 Child class 代表的参与者持有的整数值求和。 (请注意,他们没有必要 children 由同一个 parent actor 监督:SummationAggregator 只需要一个 ActorRefs 的集合。)
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import akka.actor._
import akka.contrib.pattern.Aggregator
object Child {
def props(value: Int): Props = Props(new Child(value))
case object GetValue
case class GetValueResult(value: Int)
}
class Child(value: Int) extends Actor {
import Child._
def receive = { case GetValue => sender ! GetValueResult(value) }
}
object SummationAggregator {
def props = Props(new SummationAggregator)
case object TimedOut
case class StartAggregation(targets: Seq[ActorRef])
case object BadCommand
case class AggregationResult(sum: Int)
}
class SummationAggregator extends Actor with Aggregator {
import Child._
import SummationAggregator._
expectOnce {
case StartAggregation(targets) =>
// Could do what this handler does in line but handing off to a
// separate class encapsulates the state a little more cleanly
new Handler(targets, sender())
case _ =>
sender ! BadCommand
context stop self
}
class Handler(targets: Seq[ActorRef], originalSender: ActorRef) {
// Could just store a running total and keep track of the number of responses
// that we are awaiting...
var valueResults = Set.empty[GetValueResult]
context.system.scheduler.scheduleOnce(1.second, self, TimedOut)
expect {
case TimedOut =>
// It might make sense to respond with what we have so far if some responses are still awaited...
respondIfDone(respondAnyway = true)
}
if (targets.isEmpty)
respondIfDone()
else
targets.foreach { t =>
t ! GetValue
expectOnce {
case vr: GetValueResult =>
valueResults += vr
respondIfDone()
}
}
def respondIfDone(respondAnyway: Boolean = false) = {
if (respondAnyway || valueResults.size == targets.size) {
originalSender ! AggregationResult(valueResults.foldLeft(0) { case (acc, GetValueResult(v)) => acc + v })
context stop self
}
}
}
}
要从您的 parent 演员使用这个 SummationAggregator,您可以这样做:
context.actorOf(SummationAggregator.props) ! StartAggregation(children)
然后在 parent 的接收中的某处处理 AggregationResult。
这是我遇到的模式:
一个演员 A
有多个 children C1
, ..., Cn
。收到消息后,A
将其发送给它的每个 children,每个 children 都对消息进行一些计算,并在完成后将其发送回 A
。 A
然后想合并所有 children 的结果以传递给另一个演员。
这个问题的解决方案是什么样的?或者这是 anti-pattern?在什么情况下应该如何处理这个问题?
这是一个简单的例子,希望能说明我当前的解决方案。我担心的是重复代码(直到对称);不能很好地扩展到 children 的 'lots';并且很难看清发生了什么。
import akka.actor.{Props, Actor}
case class Tagged[T](value: T, id: Int)
class A extends Actor {
import C1._
import C2._
val c1 = context.actorOf(Props[C1], "C1")
val c2 = context.actorOf(Props[C2], "C2")
var uid = 0
var c1Results = Map[Int, Int]()
var c2Results = Map[Int, Int]()
def receive = {
case n: Int => {
c1 ! Tagged(n, uid)
c2 ! Tagged(n, uid)
uid += 1
}
case Tagged(C1Result(n), id) => c2Results get id match {
case None => c1Results += (id -> n)
case Some(m) => {
c2Results -= id
context.parent ! (n, m)
}
}
case Tagged(C2Result(n), id) => c1Results get id match {
case None => c2Results += (id -> n)
case Some(m) => {
c1Results -= id
context.parent ! (m, n)
}
}
}
}
class C1 extends Actor {
import C1._
def receive = {
case Tagged(n: Int, id) => Tagged(C1Result(n), id)
}
}
object C1 {
case class C1Result(n: Int)
}
class C2 extends Actor {
import C2._
def receive = {
case Tagged(n: Int, id) => Tagged(C2Result(n), id)
}
}
object C2 {
case class C2Result(n: Int)
}
如果你认为代码看起来像 god-awful,请放轻松,我才刚刚开始学习 akka ;)
您可以在 child 演员上使用 ?
而不是 !
- 这将导致 child 演员 return a Future
与他们的(最终)结果,即一切仍然 non-blocking 直到你 Await
Future
的结果。 parent actor 然后可以编写这些 Futures
并将其发送给另一个 actor - 它已经知道每个 Future's
身份,因此您无需担心标记每条消息,以便你可以稍后把它们放回原位。这是一个简单的示例,其中每个 child return 是一个随机的 Double
,并且您想将第一个 child 的 return 值除以第二个 [=22] =] 的 return 值(即顺序很重要)。
import scala.concurrent.duration._
import akka.actor.{Props, Actor}
import akka.pattern.{ask, pipe}
import akka.util.Timeout
class A extends Actor {
val c1 = context.actorOf(Props[C], "C1")
val c2 = context.actorOf(Props[C], "C2")
// The ask operation involves creating an internal actor for handling
// this reply, which needs to have a timeout after which it is
// destroyed in order not to leak resources; see more below.
implicit val timeout = Timeout(5 seconds)
def receive = {
case _ => {
val f1 = c1 ? "anything" // Future[Any]
val f2 = c2 ? "anything" // Future[Any]
val result: Future[Double] = for {
d1 <- f1.mapTo[Double]
d2 <- f2.mapTo[Double]
} yield d1 / d2
}
}
class C extends Actor {
def receive = {
case _ => // random Double
}
}
如果有许多 - 或数量不定的 - child 演员,Zim-Zam 建议的
aggregator pattern 旨在帮助解决这种情况。它提供了一个 Aggregator 特性,您可以在 actor 中使用它来执行您的聚合逻辑。
想要执行聚合的客户端 actor 可以启动一个基于聚合器的 actor 实例并向它发送一条消息,以启动聚合过程。
应该为每个聚合操作创建一个新的聚合器,并在发回结果时终止(当它收到所有响应或超时时)。
下面列出了此模式的一个示例,用于对 Child class 代表的参与者持有的整数值求和。 (请注意,他们没有必要 children 由同一个 parent actor 监督:SummationAggregator 只需要一个 ActorRefs 的集合。)
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import akka.actor._
import akka.contrib.pattern.Aggregator
object Child {
def props(value: Int): Props = Props(new Child(value))
case object GetValue
case class GetValueResult(value: Int)
}
class Child(value: Int) extends Actor {
import Child._
def receive = { case GetValue => sender ! GetValueResult(value) }
}
object SummationAggregator {
def props = Props(new SummationAggregator)
case object TimedOut
case class StartAggregation(targets: Seq[ActorRef])
case object BadCommand
case class AggregationResult(sum: Int)
}
class SummationAggregator extends Actor with Aggregator {
import Child._
import SummationAggregator._
expectOnce {
case StartAggregation(targets) =>
// Could do what this handler does in line but handing off to a
// separate class encapsulates the state a little more cleanly
new Handler(targets, sender())
case _ =>
sender ! BadCommand
context stop self
}
class Handler(targets: Seq[ActorRef], originalSender: ActorRef) {
// Could just store a running total and keep track of the number of responses
// that we are awaiting...
var valueResults = Set.empty[GetValueResult]
context.system.scheduler.scheduleOnce(1.second, self, TimedOut)
expect {
case TimedOut =>
// It might make sense to respond with what we have so far if some responses are still awaited...
respondIfDone(respondAnyway = true)
}
if (targets.isEmpty)
respondIfDone()
else
targets.foreach { t =>
t ! GetValue
expectOnce {
case vr: GetValueResult =>
valueResults += vr
respondIfDone()
}
}
def respondIfDone(respondAnyway: Boolean = false) = {
if (respondAnyway || valueResults.size == targets.size) {
originalSender ! AggregationResult(valueResults.foldLeft(0) { case (acc, GetValueResult(v)) => acc + v })
context stop self
}
}
}
}
要从您的 parent 演员使用这个 SummationAggregator,您可以这样做:
context.actorOf(SummationAggregator.props) ! StartAggregation(children)
然后在 parent 的接收中的某处处理 AggregationResult。