停止 actor 后取消 Future
Cancel a Future after stopping an actor
我有一个主从架构的应用程序。 Worker 需要做繁重且长时间的工作。 Master需要在我们需要的时候杀掉这个工作。
我试过不使用Future
,工作人员在工作时收不到任何消息。所以我尝试使用 Future
代替。但是,当 Worker 停止时,作业仍然是 运行。如何在停止 actor 后释放资源?
这是代码。
import akka.actor.{Actor, ActorRef, ActorSystem, Props, Terminated}
import scala.concurrent.Future
import scala.concurrent.duration._
object Main extends App {
object StopTask
case class DoTask(task: String)
override def main(args: Array[String]): Unit = {
val system = ActorSystem("ClusterSystem")
val master = system.actorOf(Props[Master], "master")
master ! "FooTask"
import system.dispatcher
system.scheduler.scheduleOnce(5 second) {
master ! StopTask
}
}
class Master extends Actor {
val worker: ActorRef = context.actorOf(Props[Worker], "worker")
def receive: Receive = {
case task: String => worker ! DoTask(task)
case StopTask => context stop worker
}
}
class Worker extends Actor {
import context.dispatcher
override def postStop(): Unit = {
println("Stopping task...")
}
def receive: Receive = {
case DoTask(task) =>
Future {
// High loading job here
while (true) {
println(s"Doing $task...")
Thread.sleep(1000)
}
}
}
}
}
输出是...
[INFO ] 2018-04-08 21:48:33,947 akka.event.slf4j.Slf4jLogger - Slf4jLogger started
[INFO ] 2018-04-08 21:48:34,244 akka.remote.Remoting - Starting remoting
[INFO ] 2018-04-08 21:48:34,463 akka.remote.Remoting - Remoting started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:49770]
[INFO ] 2018-04-08 21:48:34,466 akka.remote.Remoting - Remoting now listens on addresses: [akka.tcp://ClusterSystem@127.0.0.1:49770]
[INFO ] 2018-04-08 21:48:34,521 akka.cluster.Cluster(akka://ClusterSystem) - Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:49770] - Starting up...
[INFO ] 2018-04-08 21:48:34,717 akka.cluster.Cluster(akka://ClusterSystem) - Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:49770] - Registered cluster JMX MBean [akka:type=Cluster,port=49770]
[INFO ] 2018-04-08 21:48:34,718 akka.cluster.Cluster(akka://ClusterSystem) - Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:49770] - Started up successfully
Doing FooTask...
[INFO ] 2018-04-08 21:48:34,777 akka.cluster.Cluster(akka://ClusterSystem) - Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:49770] - Metrics collection has started successfully
[INFO ] 2018-04-08 21:48:35,017 akka.cluster.Cluster(akka://ClusterSystem) - Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:49770] - Welcome from [akka.tcp://ClusterSystem@127.0.0.1:2560]
Doing FooTask...
Doing FooTask...
Doing FooTask...
Doing FooTask...
Stopping task...
Doing FooTask...
Doing FooTask...
我找到了the way来杀死Future
。但是我不知道如何集成到这个架构中。希望有人能帮助我。
由于我最初的回答(通过递归地将 Future 与自身链接来分解 Future 的执行)并没有真正解决问题,我做了一些研究并从 Roland Kuhn 博士那里看到了这个想法:https://groups.google.com/d/msg/akka-user/nkD5BN17kVk/RpUbm07rvpMJ
... you can also spawn a dedicated thread for running that and have an actor manage it; in this case you can call Thread.interrupt or Thread.stop any way you like while keeping the actor responsive.
我尝试过让 actor 管理线程的启动和停止的想法。由于 actor 是管理 potentially-expensive 资源的好方法,因此 actor 确实是管理线程的好人选。这是我的实现:
/** Does the main work (i.e. training ML model). */
class WorkerThread(task: String, parent: ActorRef) extends Runnable {
override def run(): Unit = try {
while (true) {
println(s"Doing $task...")
Thread sleep 500
}
} catch {
/* Since this thread may be interrupted at any time, we need to
gracefully handle being interrupted. Since we have a handle to the
actor that's managing us, we can send it a message telling it to
finish up. */
case _: InterruptedException => parent ! Worker.Message.FinishUp
}
}
/** Manages starting and stopping the model training thread. */
class Worker extends Actor {
private var thread: Thread = null
override def receive: Receive = {
case Worker.Message.DoTask(task) =>
if (thread == null) {
thread = new Thread(new WorkerThread(task, self))
thread.start()
}
case Worker.Message.StopTask =>
if (thread != null) thread.interrupt()
case Worker.Message.FinishUp => println("Stopped task...")
}
}
object Worker {
sealed trait Message
object Message {
case class DoTask(task: String) extends Message
case object StopTask extends Message
case object FinishUp extends Message
}
}
现在,我们可以随时使用其管理角色停止 ML 模型训练。事实证明,Future 是错误的抽象级别,但 Thread 是正确的级别。
我有一个主从架构的应用程序。 Worker 需要做繁重且长时间的工作。 Master需要在我们需要的时候杀掉这个工作。
我试过不使用Future
,工作人员在工作时收不到任何消息。所以我尝试使用 Future
代替。但是,当 Worker 停止时,作业仍然是 运行。如何在停止 actor 后释放资源?
这是代码。
import akka.actor.{Actor, ActorRef, ActorSystem, Props, Terminated}
import scala.concurrent.Future
import scala.concurrent.duration._
object Main extends App {
object StopTask
case class DoTask(task: String)
override def main(args: Array[String]): Unit = {
val system = ActorSystem("ClusterSystem")
val master = system.actorOf(Props[Master], "master")
master ! "FooTask"
import system.dispatcher
system.scheduler.scheduleOnce(5 second) {
master ! StopTask
}
}
class Master extends Actor {
val worker: ActorRef = context.actorOf(Props[Worker], "worker")
def receive: Receive = {
case task: String => worker ! DoTask(task)
case StopTask => context stop worker
}
}
class Worker extends Actor {
import context.dispatcher
override def postStop(): Unit = {
println("Stopping task...")
}
def receive: Receive = {
case DoTask(task) =>
Future {
// High loading job here
while (true) {
println(s"Doing $task...")
Thread.sleep(1000)
}
}
}
}
}
输出是...
[INFO ] 2018-04-08 21:48:33,947 akka.event.slf4j.Slf4jLogger - Slf4jLogger started
[INFO ] 2018-04-08 21:48:34,244 akka.remote.Remoting - Starting remoting
[INFO ] 2018-04-08 21:48:34,463 akka.remote.Remoting - Remoting started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:49770]
[INFO ] 2018-04-08 21:48:34,466 akka.remote.Remoting - Remoting now listens on addresses: [akka.tcp://ClusterSystem@127.0.0.1:49770]
[INFO ] 2018-04-08 21:48:34,521 akka.cluster.Cluster(akka://ClusterSystem) - Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:49770] - Starting up...
[INFO ] 2018-04-08 21:48:34,717 akka.cluster.Cluster(akka://ClusterSystem) - Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:49770] - Registered cluster JMX MBean [akka:type=Cluster,port=49770]
[INFO ] 2018-04-08 21:48:34,718 akka.cluster.Cluster(akka://ClusterSystem) - Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:49770] - Started up successfully
Doing FooTask...
[INFO ] 2018-04-08 21:48:34,777 akka.cluster.Cluster(akka://ClusterSystem) - Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:49770] - Metrics collection has started successfully
[INFO ] 2018-04-08 21:48:35,017 akka.cluster.Cluster(akka://ClusterSystem) - Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:49770] - Welcome from [akka.tcp://ClusterSystem@127.0.0.1:2560]
Doing FooTask...
Doing FooTask...
Doing FooTask...
Doing FooTask...
Stopping task...
Doing FooTask...
Doing FooTask...
我找到了the way来杀死Future
。但是我不知道如何集成到这个架构中。希望有人能帮助我。
由于我最初的回答(通过递归地将 Future 与自身链接来分解 Future 的执行)并没有真正解决问题,我做了一些研究并从 Roland Kuhn 博士那里看到了这个想法:https://groups.google.com/d/msg/akka-user/nkD5BN17kVk/RpUbm07rvpMJ
... you can also spawn a dedicated thread for running that and have an actor manage it; in this case you can call Thread.interrupt or Thread.stop any way you like while keeping the actor responsive.
我尝试过让 actor 管理线程的启动和停止的想法。由于 actor 是管理 potentially-expensive 资源的好方法,因此 actor 确实是管理线程的好人选。这是我的实现:
/** Does the main work (i.e. training ML model). */
class WorkerThread(task: String, parent: ActorRef) extends Runnable {
override def run(): Unit = try {
while (true) {
println(s"Doing $task...")
Thread sleep 500
}
} catch {
/* Since this thread may be interrupted at any time, we need to
gracefully handle being interrupted. Since we have a handle to the
actor that's managing us, we can send it a message telling it to
finish up. */
case _: InterruptedException => parent ! Worker.Message.FinishUp
}
}
/** Manages starting and stopping the model training thread. */
class Worker extends Actor {
private var thread: Thread = null
override def receive: Receive = {
case Worker.Message.DoTask(task) =>
if (thread == null) {
thread = new Thread(new WorkerThread(task, self))
thread.start()
}
case Worker.Message.StopTask =>
if (thread != null) thread.interrupt()
case Worker.Message.FinishUp => println("Stopped task...")
}
}
object Worker {
sealed trait Message
object Message {
case class DoTask(task: String) extends Message
case object StopTask extends Message
case object FinishUp extends Message
}
}
现在,我们可以随时使用其管理角色停止 ML 模型训练。事实证明,Future 是错误的抽象级别,但 Thread 是正确的级别。