并行执行期货列表

Execute list of Futures in parallel

我对 Scala 的 Future 和 Akka 很陌生,目前,我正在尝试实现一个应用程序,该应用程序执行独立任务列表并将结果收集在一起。

例如,我想要一个由多个任务组成的应用程序,每个任务接收一个数字,休眠几秒钟,然后 return "Hello " 消息。

actor实现如下:

class HelloActor extends Actor {
  def receive = {
    case name:Int => {
      println("%s will sleep for %s seconds".format(name, name % 4))
      Thread.sleep(name % 4 * 1000)
      sender ! "Hello %s".format(name)
    }
  }
}

主要对象实现为:

object HelloAkka extends App {
  val system = ActorSystem("HelloSystem")

  val helloActor = system.actorOf(Props[HelloActor], name = "helloactor")

  implicit val timeout = Timeout(20, TimeUnit.SECONDS)

  val futures = (1 to 10).map(num => {
    helloActor ? num
  })

  val future = Future.sequence(futures)

  val results = Await.result(future, timeout.duration)

  println(results)

  system.shutdown
}

由于每个任务会休眠 0、1、2 或 3 秒,我希望休眠时间较短的任务先执行。然而,结果是:

1 will sleep for 1 seconds
2 will sleep for 2 seconds
3 will sleep for 3 seconds
4 will sleep for 0 seconds
5 will sleep for 1 seconds
6 will sleep for 2 seconds
7 will sleep for 3 seconds
8 will sleep for 0 seconds
9 will sleep for 1 seconds
10 will sleep for 2 seconds
Vector(Hello 1, Hello 2, Hello 3, Hello 4, Hello 5, Hello 6, Hello 7, Hello 8, Hello 9, Hello 10)

换句话说,所有任务都是按顺序执行的。我想知道是否有任何方法可以让我并行执行所有任务。

如评论中所述,您将所有 tasks/messages 发送给一个演员,并且保证所有这些 tasks/messages 将按顺序处理。

要并行处理任务,您需要有多个处理程序 actor 实例,在您的情况下 HelloActor

当然,您可以只创建 HelloActor 的多个实例,但这绝对不是好的做法。

对于此类任务,您应该使用内置路由功能,它允许您管理 workers/handlers 池并通过一个 router actor 与它们交互,例如。

val router: ActorRef =
  context.actorOf(RoundRobinPool(10).props(Props[HelloActor]), "router")

...
router ? num
...

请按照 Akka Routing 文档获取更多详细信息。

我建议在 Future 中执行实际任务,而不是像评论和答案中建议的那样启动多个参与者。所以你的演员更像是任务的协调员。例如:

//...    

// import pipe pattern to get access to `pipeTo` method
import akka.pattern.pipe
import scala.concurrent.Future

// the `Future`s will be executed on this dispatcher
// depending on your needs, you may want to create a 
// dedicated executor for this
class TaskCoordinatorActor extends Actor {
  import context.dispatcher

  def receive = {
    case name: Int =>
      Future {
        Thread.sleep(name % 4 * 1000)
        "Hello %s".format(name)
      } pipeTo sender()
  }
}

以上代码在 scala.concurrent.Future 中执行您的任务并将结果通过管道传递给原始发件人。这样 actor 在任务完成之前不会阻塞,但是一旦 Future 被创建就准备好接收下一条消息。

P.S.: 你应该创建消息类型,而不是发送普通整数,明确你希望演员做什么。在您的情况下,它可能是:

case class Sleep(duration: Duration)

同一个actor发送给同一个actor的消息会依次执行。

你有两个选择。

要么为每个消息创建一个新的 HelloActor 副本,以便它们全部并行执行,要么将您的 HelloActor 修改为如下内容(可能导入错误,凭记忆):

import akka.pattern.pipe._

class HelloActor extends Actor {
  def receive = {
    case name:Int => {
      println("%s will sleep for %s seconds".format(name, name % 4))
      Future(sleepAndRespond(name)) pipeTo sender
   }
 }

 def sleepAndRespond(name:String) = {
   Thread.sleep(name % 4 * 1000)
   "Hello %s".format(innerName)
 }
}

这样,执行的顺序部分只是未来的管道,然后对十条消息中的每一条消息异步执行。