并行执行期货列表
Execute list of Futures in parallel
我对 Scala 的 Future 和 Akka 很陌生,目前,我正在尝试实现一个应用程序,该应用程序执行独立任务列表并将结果收集在一起。
例如,我想要一个由多个任务组成的应用程序,每个任务接收一个数字,休眠几秒钟,然后 return "Hello " 消息。
actor实现如下:
class HelloActor extends Actor {
def receive = {
case name:Int => {
println("%s will sleep for %s seconds".format(name, name % 4))
Thread.sleep(name % 4 * 1000)
sender ! "Hello %s".format(name)
}
}
}
主要对象实现为:
object HelloAkka extends App {
val system = ActorSystem("HelloSystem")
val helloActor = system.actorOf(Props[HelloActor], name = "helloactor")
implicit val timeout = Timeout(20, TimeUnit.SECONDS)
val futures = (1 to 10).map(num => {
helloActor ? num
})
val future = Future.sequence(futures)
val results = Await.result(future, timeout.duration)
println(results)
system.shutdown
}
由于每个任务会休眠 0、1、2 或 3 秒,我希望休眠时间较短的任务先执行。然而,结果是:
1 will sleep for 1 seconds
2 will sleep for 2 seconds
3 will sleep for 3 seconds
4 will sleep for 0 seconds
5 will sleep for 1 seconds
6 will sleep for 2 seconds
7 will sleep for 3 seconds
8 will sleep for 0 seconds
9 will sleep for 1 seconds
10 will sleep for 2 seconds
Vector(Hello 1, Hello 2, Hello 3, Hello 4, Hello 5, Hello 6, Hello 7, Hello 8, Hello 9, Hello 10)
换句话说,所有任务都是按顺序执行的。我想知道是否有任何方法可以让我并行执行所有任务。
如评论中所述,您将所有 tasks/messages 发送给一个演员,并且保证所有这些 tasks/messages 将按顺序处理。
要并行处理任务,您需要有多个处理程序 actor 实例,在您的情况下 HelloActor
。
当然,您可以只创建 HelloActor
的多个实例,但这绝对不是好的做法。
对于此类任务,您应该使用内置路由功能,它允许您管理 workers/handlers 池并通过一个 router
actor 与它们交互,例如。
val router: ActorRef =
context.actorOf(RoundRobinPool(10).props(Props[HelloActor]), "router")
...
router ? num
...
请按照 Akka Routing 文档获取更多详细信息。
我建议在 Future
中执行实际任务,而不是像评论和答案中建议的那样启动多个参与者。所以你的演员更像是任务的协调员。例如:
//...
// import pipe pattern to get access to `pipeTo` method
import akka.pattern.pipe
import scala.concurrent.Future
// the `Future`s will be executed on this dispatcher
// depending on your needs, you may want to create a
// dedicated executor for this
class TaskCoordinatorActor extends Actor {
import context.dispatcher
def receive = {
case name: Int =>
Future {
Thread.sleep(name % 4 * 1000)
"Hello %s".format(name)
} pipeTo sender()
}
}
以上代码在 scala.concurrent.Future
中执行您的任务并将结果通过管道传递给原始发件人。这样 actor 在任务完成之前不会阻塞,但是一旦 Future
被创建就准备好接收下一条消息。
P.S.: 你应该创建消息类型,而不是发送普通整数,明确你希望演员做什么。在您的情况下,它可能是:
case class Sleep(duration: Duration)
同一个actor发送给同一个actor的消息会依次执行。
你有两个选择。
要么为每个消息创建一个新的 HelloActor 副本,以便它们全部并行执行,要么将您的 HelloActor 修改为如下内容(可能导入错误,凭记忆):
import akka.pattern.pipe._
class HelloActor extends Actor {
def receive = {
case name:Int => {
println("%s will sleep for %s seconds".format(name, name % 4))
Future(sleepAndRespond(name)) pipeTo sender
}
}
def sleepAndRespond(name:String) = {
Thread.sleep(name % 4 * 1000)
"Hello %s".format(innerName)
}
}
这样,执行的顺序部分只是未来的管道,然后对十条消息中的每一条消息异步执行。
我对 Scala 的 Future 和 Akka 很陌生,目前,我正在尝试实现一个应用程序,该应用程序执行独立任务列表并将结果收集在一起。
例如,我想要一个由多个任务组成的应用程序,每个任务接收一个数字,休眠几秒钟,然后 return "Hello " 消息。
actor实现如下:
class HelloActor extends Actor {
def receive = {
case name:Int => {
println("%s will sleep for %s seconds".format(name, name % 4))
Thread.sleep(name % 4 * 1000)
sender ! "Hello %s".format(name)
}
}
}
主要对象实现为:
object HelloAkka extends App {
val system = ActorSystem("HelloSystem")
val helloActor = system.actorOf(Props[HelloActor], name = "helloactor")
implicit val timeout = Timeout(20, TimeUnit.SECONDS)
val futures = (1 to 10).map(num => {
helloActor ? num
})
val future = Future.sequence(futures)
val results = Await.result(future, timeout.duration)
println(results)
system.shutdown
}
由于每个任务会休眠 0、1、2 或 3 秒,我希望休眠时间较短的任务先执行。然而,结果是:
1 will sleep for 1 seconds
2 will sleep for 2 seconds
3 will sleep for 3 seconds
4 will sleep for 0 seconds
5 will sleep for 1 seconds
6 will sleep for 2 seconds
7 will sleep for 3 seconds
8 will sleep for 0 seconds
9 will sleep for 1 seconds
10 will sleep for 2 seconds
Vector(Hello 1, Hello 2, Hello 3, Hello 4, Hello 5, Hello 6, Hello 7, Hello 8, Hello 9, Hello 10)
换句话说,所有任务都是按顺序执行的。我想知道是否有任何方法可以让我并行执行所有任务。
如评论中所述,您将所有 tasks/messages 发送给一个演员,并且保证所有这些 tasks/messages 将按顺序处理。
要并行处理任务,您需要有多个处理程序 actor 实例,在您的情况下 HelloActor
。
当然,您可以只创建 HelloActor
的多个实例,但这绝对不是好的做法。
对于此类任务,您应该使用内置路由功能,它允许您管理 workers/handlers 池并通过一个 router
actor 与它们交互,例如。
val router: ActorRef =
context.actorOf(RoundRobinPool(10).props(Props[HelloActor]), "router")
...
router ? num
...
请按照 Akka Routing 文档获取更多详细信息。
我建议在 Future
中执行实际任务,而不是像评论和答案中建议的那样启动多个参与者。所以你的演员更像是任务的协调员。例如:
//...
// import pipe pattern to get access to `pipeTo` method
import akka.pattern.pipe
import scala.concurrent.Future
// the `Future`s will be executed on this dispatcher
// depending on your needs, you may want to create a
// dedicated executor for this
class TaskCoordinatorActor extends Actor {
import context.dispatcher
def receive = {
case name: Int =>
Future {
Thread.sleep(name % 4 * 1000)
"Hello %s".format(name)
} pipeTo sender()
}
}
以上代码在 scala.concurrent.Future
中执行您的任务并将结果通过管道传递给原始发件人。这样 actor 在任务完成之前不会阻塞,但是一旦 Future
被创建就准备好接收下一条消息。
P.S.: 你应该创建消息类型,而不是发送普通整数,明确你希望演员做什么。在您的情况下,它可能是:
case class Sleep(duration: Duration)
同一个actor发送给同一个actor的消息会依次执行。
你有两个选择。
要么为每个消息创建一个新的 HelloActor 副本,以便它们全部并行执行,要么将您的 HelloActor 修改为如下内容(可能导入错误,凭记忆):
import akka.pattern.pipe._
class HelloActor extends Actor {
def receive = {
case name:Int => {
println("%s will sleep for %s seconds".format(name, name % 4))
Future(sleepAndRespond(name)) pipeTo sender
}
}
def sleepAndRespond(name:String) = {
Thread.sleep(name % 4 * 1000)
"Hello %s".format(innerName)
}
}
这样,执行的顺序部分只是未来的管道,然后对十条消息中的每一条消息异步执行。