Akka 等待演员的计算
Akka wait for computation of actor
我想创建一个 actor 的最小示例,它向 actor 发送消息,然后等待该 actor 的响应。这个例子的原因是我想在我的论文中使用它来讨论使用其他语言特性(例如,期货)而不是纯粹的演员。所以这里的要点是它 必须 成为一个在处理其他任何事情之前等待消息的演员。
我的想法是演示一个 actor 请求从磁盘读取文件,然后进行一些长时间的计算,然后等待读取完成。
到目前为止我有以下内容:
import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props
import scala.io.Source
case class FileContents(content: String)
class WorkerActor extends Actor
{
def receive =
{
case "compute" =>
println("Computing!")
// Create actor to read the file
val reader = context.actorOf(Props[ReadFileActor])
reader ! ReadFile("/home/christophe/code/thesis-example/src/main/resources/file.txt")
// Heavy computation
Thread.sleep(5000)
case FileContents(content) =>
println("Got file content:\n" + content)
// Continue computation.
}
}
case class ReadFile(path: String)
class ReadFileActor extends Actor
{
def receive =
{
case ReadFile(path) =>
var contents: String = ""
for (line <- Source.fromFile(path).getLines())
{
contents += line
}
sender ! FileContents(contents)
}
}
object Main extends App
{
val system = ActorSystem("HelloSystem")
val worker = system.actorOf(Props[WorkerActor], name = "worker")
worker ! "compute"
worker ! "compute"
}
但是这里发生的是 WorkerActor
接收到计算消息,然后启动一个子 actor 来读取文件。在升沉计算之后,它接收到第二条 compute
消息。最后收到来自 ReadFile
演员的两条消息。
我真正想要发生的是 WorkerActor
收到 compute
消息,进行繁重的计算,然后等到他收到 FileContents
消息。只有在那之后它才能接收任何其他消息(即第二个 compute
消息)。
我已经阅读了文档并四处搜索示例,但我似乎无法在其中找到任何内容。
免责声明:除了我论文中的这个小例子,我没有使用 Akka。
只需为多个 compute
条消息创建多个 (pool of) 个工作人员而不是一个工作人员,smthng 如:
object Main extends App {
val system = ActorSystem("HelloSystem")
val router = system.actorOf(RoundRobinPool(2).props(Props[Worker]), "router")
router ! "compute"
router ! "compute"
}
如果您希望第二个 worker 在第一个之后启动:
def receive = {
case "compute" => ...
case FileContents(content) =>
println("Got file content:\n" + content)
// Continue computation.
context.parent ! "compute" //send to the pool
}
...
//Main:
router ! "compute"
另一种选择是记住 "compute"
的 sender
(这将是你的 Main
)并将响应发送回顶层:
var main = _
def receive = {
case "compute" =>
...
main = sender
case FileContents(content) =>
...
main ! "ack"
}
//Main:
(router ? "compute") foreach (_ => router ! compute)
如果你不喜欢这里的 future - 你可以用演员重写它:
//Main
class MainActor extends Actor {
def receive = {
case "start" => router ! "compute"
case "ack" => router ! "compute"
}
}
P.S。 actor 内部的阻塞计算应该 managed properly 否则它们可能会导致线程饥饿。
听起来您想要的是 keep processing
此消息,直到您得到回复。换句话说 block
class WorkerActor extends Actor {
def receive = {
case "compute" =>
println("Computing!")
// Create actor to read the file
val reader = context.actorOf(Props[ReadFileActor])
val future: Future[FileContents] =
(reader ? ReadFile("file.txt").mapTo[FileContents]
// Heavy computation
Thread.sleep(5000)
// this will block current thread and thus the actor,
// so it will not process any other messages until
// future is completed or time is out
Await.result(future, timeout)
}
}
BUT
,这在actor领域被认为是a very bad thing
。
我想创建一个 actor 的最小示例,它向 actor 发送消息,然后等待该 actor 的响应。这个例子的原因是我想在我的论文中使用它来讨论使用其他语言特性(例如,期货)而不是纯粹的演员。所以这里的要点是它 必须 成为一个在处理其他任何事情之前等待消息的演员。
我的想法是演示一个 actor 请求从磁盘读取文件,然后进行一些长时间的计算,然后等待读取完成。
到目前为止我有以下内容:
import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props
import scala.io.Source
case class FileContents(content: String)
class WorkerActor extends Actor
{
def receive =
{
case "compute" =>
println("Computing!")
// Create actor to read the file
val reader = context.actorOf(Props[ReadFileActor])
reader ! ReadFile("/home/christophe/code/thesis-example/src/main/resources/file.txt")
// Heavy computation
Thread.sleep(5000)
case FileContents(content) =>
println("Got file content:\n" + content)
// Continue computation.
}
}
case class ReadFile(path: String)
class ReadFileActor extends Actor
{
def receive =
{
case ReadFile(path) =>
var contents: String = ""
for (line <- Source.fromFile(path).getLines())
{
contents += line
}
sender ! FileContents(contents)
}
}
object Main extends App
{
val system = ActorSystem("HelloSystem")
val worker = system.actorOf(Props[WorkerActor], name = "worker")
worker ! "compute"
worker ! "compute"
}
但是这里发生的是 WorkerActor
接收到计算消息,然后启动一个子 actor 来读取文件。在升沉计算之后,它接收到第二条 compute
消息。最后收到来自 ReadFile
演员的两条消息。
我真正想要发生的是 WorkerActor
收到 compute
消息,进行繁重的计算,然后等到他收到 FileContents
消息。只有在那之后它才能接收任何其他消息(即第二个 compute
消息)。
我已经阅读了文档并四处搜索示例,但我似乎无法在其中找到任何内容。
免责声明:除了我论文中的这个小例子,我没有使用 Akka。
只需为多个 compute
条消息创建多个 (pool of) 个工作人员而不是一个工作人员,smthng 如:
object Main extends App {
val system = ActorSystem("HelloSystem")
val router = system.actorOf(RoundRobinPool(2).props(Props[Worker]), "router")
router ! "compute"
router ! "compute"
}
如果您希望第二个 worker 在第一个之后启动:
def receive = {
case "compute" => ...
case FileContents(content) =>
println("Got file content:\n" + content)
// Continue computation.
context.parent ! "compute" //send to the pool
}
...
//Main:
router ! "compute"
另一种选择是记住 "compute"
的 sender
(这将是你的 Main
)并将响应发送回顶层:
var main = _
def receive = {
case "compute" =>
...
main = sender
case FileContents(content) =>
...
main ! "ack"
}
//Main:
(router ? "compute") foreach (_ => router ! compute)
如果你不喜欢这里的 future - 你可以用演员重写它:
//Main
class MainActor extends Actor {
def receive = {
case "start" => router ! "compute"
case "ack" => router ! "compute"
}
}
P.S。 actor 内部的阻塞计算应该 managed properly 否则它们可能会导致线程饥饿。
听起来您想要的是 keep processing
此消息,直到您得到回复。换句话说 block
class WorkerActor extends Actor {
def receive = {
case "compute" =>
println("Computing!")
// Create actor to read the file
val reader = context.actorOf(Props[ReadFileActor])
val future: Future[FileContents] =
(reader ? ReadFile("file.txt").mapTo[FileContents]
// Heavy computation
Thread.sleep(5000)
// this will block current thread and thus the actor,
// so it will not process any other messages until
// future is completed or time is out
Await.result(future, timeout)
}
}
BUT
,这在actor领域被认为是a very bad thing
。