需要用Akka顺序执行操作
Need to execute operations sequentially with Akka
我有一个演员正在编排数据库更新。我需要确保每个操作仅在前一个操作完成后才执行。
这是因为操作 B 将重用操作 A 的结果。
这是我为演员写的代码。
class DbUpdateActor(databaseOperations: DBProvider) extends Actor {
implicit val ec:ExecutionContext = context.system.dispatcher
def receive: Receive = {
case newInfo : UpdateDb =>
val future = Future {
// gets the current situation from DB
val status = databaseOperations.getSituation()
// do db update
databaseOperations.save(something)
}
future onComplete {
case Success(result: List[Int]) =>
//
case Failure(err: Throwable) =>
//
}
}
}
该代码适用于单个操作。如果我触发两个更新,那么第二个将异步执行,因此它会在第一个更新完成之前开始。
我正在阅读有关不同类型邮箱的信息,不确定使用不同的邮箱是否有帮助。
有什么建议吗?
您可以探索的一个选项是删除 Future
并允许该阻塞数据库代码在 actor 中 运行。然后,使用一个单独的调度程序(可能是 PinnedDispatcher)将此阻塞代码与主要参与者系统的调度程序隔离开来,为其提供自己的线程以 运行 打开。通过在正文中阻塞并删除 Future
,您将确保演员邮箱的正确顺序执行。使该工作生效的更改的粗略草图如下:
object DbUpdateActor{
def props(databaseOperations:DBProvider) =
Props(classOf[DbUpdateActor], databaseOperations).
withDispatcher("db-update-dispatcher")
}
class DbUpdateActor(databaseOperations: DBProvider) extends Actor {
def receive: Receive = {
case newInfo : UpdateDb =>
val status = databaseOperations.getSituation()
databaseOperations.save(something)
}
}
然后,只要您在 actor 系统配置中配置了以下调度程序:
db-update-dispatcher {
executor = "thread-pool-executor"
type = PinnedDispatcher
}
然后您像这样启动了数据库更新 actor:
val updater = system.actorOf(DbUpdateActor.props(databaseOperations))
然后你应该准备好将这个 actor 设置为 运行 以不会对主调度程序的吞吐量产生负面影响的方式阻塞代码。
怎么样:在child开始操作A;当 child 完成时,它会向 parent 发送一条消息,说明它已完成。然后你可以开始操作B,在现有的或新的child.
我有一个演员正在编排数据库更新。我需要确保每个操作仅在前一个操作完成后才执行。 这是因为操作 B 将重用操作 A 的结果。
这是我为演员写的代码。
class DbUpdateActor(databaseOperations: DBProvider) extends Actor {
implicit val ec:ExecutionContext = context.system.dispatcher
def receive: Receive = {
case newInfo : UpdateDb =>
val future = Future {
// gets the current situation from DB
val status = databaseOperations.getSituation()
// do db update
databaseOperations.save(something)
}
future onComplete {
case Success(result: List[Int]) =>
//
case Failure(err: Throwable) =>
//
}
}
}
该代码适用于单个操作。如果我触发两个更新,那么第二个将异步执行,因此它会在第一个更新完成之前开始。
我正在阅读有关不同类型邮箱的信息,不确定使用不同的邮箱是否有帮助。
有什么建议吗?
您可以探索的一个选项是删除 Future
并允许该阻塞数据库代码在 actor 中 运行。然后,使用一个单独的调度程序(可能是 PinnedDispatcher)将此阻塞代码与主要参与者系统的调度程序隔离开来,为其提供自己的线程以 运行 打开。通过在正文中阻塞并删除 Future
,您将确保演员邮箱的正确顺序执行。使该工作生效的更改的粗略草图如下:
object DbUpdateActor{
def props(databaseOperations:DBProvider) =
Props(classOf[DbUpdateActor], databaseOperations).
withDispatcher("db-update-dispatcher")
}
class DbUpdateActor(databaseOperations: DBProvider) extends Actor {
def receive: Receive = {
case newInfo : UpdateDb =>
val status = databaseOperations.getSituation()
databaseOperations.save(something)
}
}
然后,只要您在 actor 系统配置中配置了以下调度程序:
db-update-dispatcher {
executor = "thread-pool-executor"
type = PinnedDispatcher
}
然后您像这样启动了数据库更新 actor:
val updater = system.actorOf(DbUpdateActor.props(databaseOperations))
然后你应该准备好将这个 actor 设置为 运行 以不会对主调度程序的吞吐量产生负面影响的方式阻塞代码。
怎么样:在child开始操作A;当 child 完成时,它会向 parent 发送一条消息,说明它已完成。然后你可以开始操作B,在现有的或新的child.