需要用Akka顺序执行操作

Need to execute operations sequentially with Akka

我有一个演员正在编排数据库更新。我需要确保每个操作仅在前一个操作完成后才执行。 这是因为操作 B 将重用操作 A 的结果。

这是我为演员写的代码。

class DbUpdateActor(databaseOperations: DBProvider) extends Actor {

  implicit val ec:ExecutionContext = context.system.dispatcher

  def receive: Receive = {

    case newInfo : UpdateDb =>

      val future = Future {
            // gets the current situation from DB 
            val status = databaseOperations.getSituation()
            // do db update
            databaseOperations.save(something)
      }

      future onComplete {
        case Success(result: List[Int]) =>
            //
        case Failure(err: Throwable) =>
            //
      }
  }
}

该代码适用于单个操作。如果我触发两个更新,那么第二个将异步执行,因此它会在第一个更新完成之前开始。

我正在阅读有关不同类型邮箱的信息,不确定使用不同的邮箱是否有帮助。

有什么建议吗?

您可以探索的一个选项是删除 Future 并允许该阻塞数据库代码在 actor 中 运行。然后,使用一个单独的调度程序(可能是 PinnedDispatcher)将此阻塞代码与主要参与者系统的调度程序隔离开来,为其提供自己的线程以 运行 打开。通过在正文中阻塞并删除 Future,您将确保演员邮箱的正确顺序执行。使该工作生效的更改的粗略草图如下:

object DbUpdateActor{
  def props(databaseOperations:DBProvider) =
    Props(classOf[DbUpdateActor], databaseOperations).
      withDispatcher("db-update-dispatcher")
}

class DbUpdateActor(databaseOperations: DBProvider) extends Actor {
  def receive: Receive = {

    case newInfo : UpdateDb =>
      val status = databaseOperations.getSituation()
      databaseOperations.save(something)
  }
}

然后,只要您在 actor 系统配置中配置了以下调度程序:

db-update-dispatcher {
  executor = "thread-pool-executor"
  type = PinnedDispatcher
}

然后您像这样启动了数据库更新 actor:

val updater = system.actorOf(DbUpdateActor.props(databaseOperations))

然后你应该准备好将这个 actor 设置为 运行 以不会对主调度程序的吞吐量产生负面影响的方式阻塞代码。

怎么样:在child开始操作A;当 child 完成时,它会向 parent 发送一条消息,说明它已完成。然后你可以开始操作B,在现有的或新的child.