停止 actor 后取消 Future

Cancel a Future after stopping an actor

我有一个主从架构的应用程序。 Worker 需要做繁重且长时间的工作。 Master需要在我们需要的时候杀掉这个工作。

我试过不使用Future,工作人员在工作时收不到任何消息。所以我尝试使用 Future 代替。但是,当 Worker 停止时,作业仍然是 运行。如何在停止 actor 后释放资源?

这是代码。

import akka.actor.{Actor, ActorRef, ActorSystem, Props, Terminated}

import scala.concurrent.Future
import scala.concurrent.duration._


object Main extends App {

  object StopTask

  case class DoTask(task: String)

  override def main(args: Array[String]): Unit = {
    val system = ActorSystem("ClusterSystem")
    val master = system.actorOf(Props[Master], "master")
    master ! "FooTask"
    import system.dispatcher
    system.scheduler.scheduleOnce(5 second) {
      master ! StopTask
    }
  }

  class Master extends Actor {
    val worker: ActorRef = context.actorOf(Props[Worker], "worker")

    def receive: Receive = {
      case task: String => worker ! DoTask(task)
      case StopTask => context stop worker
    }
  }

  class Worker extends Actor {

    import context.dispatcher

    override def postStop(): Unit = {
      println("Stopping task...")
    }

    def receive: Receive = {
      case DoTask(task) =>
        Future {
          // High loading job here
          while (true) {
            println(s"Doing $task...")
            Thread.sleep(1000)
          }
        }
    }
  }

}

输出是...

[INFO ] 2018-04-08 21:48:33,947 akka.event.slf4j.Slf4jLogger - Slf4jLogger started
[INFO ] 2018-04-08 21:48:34,244 akka.remote.Remoting - Starting remoting
[INFO ] 2018-04-08 21:48:34,463 akka.remote.Remoting - Remoting started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:49770]
[INFO ] 2018-04-08 21:48:34,466 akka.remote.Remoting - Remoting now listens on addresses: [akka.tcp://ClusterSystem@127.0.0.1:49770]
[INFO ] 2018-04-08 21:48:34,521 akka.cluster.Cluster(akka://ClusterSystem) - Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:49770] - Starting up...
[INFO ] 2018-04-08 21:48:34,717 akka.cluster.Cluster(akka://ClusterSystem) - Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:49770] - Registered cluster JMX MBean [akka:type=Cluster,port=49770]
[INFO ] 2018-04-08 21:48:34,718 akka.cluster.Cluster(akka://ClusterSystem) - Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:49770] - Started up successfully
Doing FooTask...
[INFO ] 2018-04-08 21:48:34,777 akka.cluster.Cluster(akka://ClusterSystem) - Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:49770] - Metrics collection has started successfully
[INFO ] 2018-04-08 21:48:35,017 akka.cluster.Cluster(akka://ClusterSystem) - Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:49770] - Welcome from [akka.tcp://ClusterSystem@127.0.0.1:2560]
Doing FooTask...
Doing FooTask...
Doing FooTask...
Doing FooTask...
Stopping task...
Doing FooTask...
Doing FooTask...

我找到了the way来杀死Future。但是我不知道如何集成到这个架构中。希望有人能帮助我。

由于我最初的回答(通过递归地将 Future 与自身链接来分解 Future 的执行)并没有真正解决问题,我做了一些研究并从 Roland Kuhn 博士那里看到了这个想法:https://groups.google.com/d/msg/akka-user/nkD5BN17kVk/RpUbm07rvpMJ

... you can also spawn a dedicated thread for running that and have an actor manage it; in this case you can call Thread.interrupt or Thread.stop any way you like while keeping the actor responsive.

我尝试过让 actor 管理线程的启动和停止的想法。由于 actor 是管理 potentially-expensive 资源的好方法,因此 actor 确实是管理线程的好人选。这是我的实现:

/** Does the main work (i.e. training ML model). */
class WorkerThread(task: String, parent: ActorRef) extends Runnable {
  override def run(): Unit = try {
    while (true) {
      println(s"Doing $task...")
      Thread sleep 500
    }
  } catch {
    /* Since this thread may be interrupted at any time, we need to
       gracefully handle being interrupted. Since we have a handle to the
       actor that's managing us, we can send it a message telling it to
       finish up. */
    case _: InterruptedException => parent ! Worker.Message.FinishUp
  }
}

/** Manages starting and stopping the model training thread. */
class Worker extends Actor {
  private var thread: Thread = null

  override def receive: Receive = {
    case Worker.Message.DoTask(task) =>
      if (thread == null) {
        thread = new Thread(new WorkerThread(task, self))
        thread.start()
      }
    case Worker.Message.StopTask =>
      if (thread != null) thread.interrupt()
    case Worker.Message.FinishUp => println("Stopped task...")
  }
}

object Worker {
  sealed trait Message
  object Message {
    case class DoTask(task: String) extends Message
    case object StopTask extends Message
    case object FinishUp extends Message
  }
}

现在,我们可以随时使用其管理角色停止 ML 模型训练。事实证明,Future 是错误的抽象级别,但 Thread 是正确的级别。