akka 系统 - 在参与者之间执行通用逻辑的最佳实践

akka system - best practice to execute common logic between actors

我是 akka actor 系统的新手,一直在思考在 actor 中执行通用逻辑的最佳实践是什么。 所以这是一个例子:

我有以下演员:

class MyActor @Inject()(eventBus: EventBus) extends Actor{
    eventBus.subscribe(context.self, Topics.TimeoffPolicy)
    override def receive: Receive = {
       case Foo(name:String) => {
           //Need to execute foo related logic, and then Bar related logic
       }
       case Bar(name:String) => {
             //Need to execute bar related logic
       }
       case a: Any => log.warning(f"unknown message actor ${a.toString}")
    }
}

所以一个选择是将公共逻辑提取到一个方法中,并在像这样处理 Foo 时调用它:

class MyActor @Inject()(eventBus: EventBus) extends Actor{
    eventBus.subscribe(context.self, Topics.TimeoffPolicy)
    override def receive: Receive = {
       case Foo(name:String) => {
           foo(name)
           bar(name)
       }
       case Bar(name:String) => {
           bar(name)
       }
       case a: Any => log.warning(f"unknown message actor ${a.toString}")
    }
}

其他选项是向我自己发送消息:

class MyActor @Inject()(eventBus: EventBus) extends Actor{
    eventBus.subscribe(context.self, Topics.TimeoffPolicy)
    override def receive: Receive = {
       case Foo(name:String) => {
           foo()
           self ! Bar(name)
       }
       case Bar(name:String) => {
           bar(name)
       }
       case a: Any => log.warning(f"unknown message actor ${a.toString}")
    }
}

在这里,发送一条消息并将逻辑封装在每个消息处理中是有意义的,但我想将公共逻辑提取到一个方法并调用它会减少错误修剪。推荐什么?

对于 actor 之间的通用逻辑也是如此,一种选择是通过事件总线将消息发送到另一个 actor 以进行调用,因此 actor1 将执行 foo,而另一个将执行 bar。 第二种选择是将相同的逻辑提取到另一个 class 中,并将 class 注入两个 actor,因此为了执行 foo 和 bar,不会有 actor 之间的通信。

你怎么看?

这两种方法对我来说似乎都是合法的,我会选择两者之一,具体取决于我是否调用了共享逻辑中的其他参与者。基本上,同步逻辑可以通过提取一个方法来完美重用,而异步逻辑则需要将消息传回给自己。

发给 self 的消息也绝对首选从 Future 回调发送(实际上,这是唯一正确的方法,以免打乱 actor 任务的执行顺序) 和预定活动中。

这里有一个片段来说明同步方法:

class MyActor extends Actor with ActorLogging {
  def receive = {
    case Foo(foo) =>
      doSomeFooSpecificWork()
      logFooOrBar()
      sender() ! "foo done"
    case Bar =>
      logFooOrBar()
      sender() ! "bar done"
  }

  def logFooOrBar() = log.debug("A common message is handled")
}

下面是我要为异步写的内容:

import akka.pattern.{ask, pipe}
class MyActor extends Actor {
  val logger = context.actorOf(Props[DedicatedLogger])
  def receive = {
    case Foo(foo) =>
      doSomeFooSpecificWork()
      loggerActor ? LogFooOrBar(sender(), foo) pipeTo self

    case Bar(bar) =>
      loggerActor ? LogFooOrBar(sender(), bar) pipeTo self

    case LoggedFoo(reportTo, foo) => reportTo ! "foo done"

    case LoggedBar(reportTo, bar) => reportTo ! "bar done"
  }
}