如何对 Akka actor 系统进行 blocking/synchronous 调用?
How to make blocking/synchronous calls into an Akka actor system?
Akka 2.4.1 Java API 这里。我现在没有足够的带宽来学习 Scala,所以我会问这里的代码示例也使用 Java API.
我有一个现有的 ActorSystem
,它充满了异步 actors,并且工作得很好。我现在需要像这样在同步上下文中重用这个 actor 系统:
// Groovy pseudo-code; NOT inside the actor system here!
ComputationRequest request = new ComputationRequest(1, 7, true)
MasterActor master = actorSystem.get(...)
ComputationResult result = actorSystem.tell(master, request)
在 Akka 的文档中,我没有看到将请求发送到 actor 系统(从外部)然后检索结果的明确示例。我可以在这里使用 Futures
吗?在 Akka(代码示例)中处理这种模式的标准方法是什么?
ask 模式可以满足您的需求。它期望目标参与者通过 tell
发送到 getSender()
来 "return" 响应。您将得到一个 Future
的响应并可以使用它(如果必须的话,阻止)。
import akka.dispatch.*;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Await;
import scala.concurrent.Promise;
import akka.util.Timeout;
ComputationRequest request = new ComputationRequest(1, 7, true);
MasterActor master = actorSystem.get(...);
Timeout timeout = new Timeout(Duration.create(5, "seconds"));
Future<Object> future = Patterns.ask(master, request, timeout);
ComputationResult result = (ComputationResult) Await.result(future, timeout.duration());
您可以将包装在 actor 中的回调注入 akka 系统,并将其用作 "sender" 和 tell 机制。完整示例:
import akka.actor.*;
import akka.dispatch.Await;
import akka.dispatch.Future;
import akka.pattern.Patterns;
import akka.util.Timeout;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
public class Main {
public static void main(String[] args) throws Exception {
// init your akka system
final ActorSystem system = ActorSystem.create("Cambria");
final ActorRef worker = system.actorOf(new Props(Worker.class), "worker");
worker.tell(new Work(10));
// make callback
final Consumer<Object> callback = a -> System.out.println("from the other side: " + a);
// wrap call back into sub-actor
class TheSpy extends UntypedActor {
@Override
public void onReceive(final Object message) throws Exception {
callback.accept(message);
}
}
// inject callback into the system
final ActorRef theSpy = system.actorOf(new Props(TheSpy::new), "spy");
// find actor you want to hack
final ActorSelection found = system.actorSelection("/user/worker");
// send it a message and observe using callback)
found.tell(new Work(20), theSpy);
final Timeout timeout = new Timeout(5, TimeUnit.SECONDS);
final Future<Object> future = Patterns.ask(worker, new Work(30), timeout);
final Work result = (Work) Await.result(future, timeout.duration());
System.out.println(result);
system.shutdown();
system.awaitTermination();
}
}
public class Worker extends UntypedActor {
public void onReceive(Object message) {
if (message instanceof Work) {
Work work = (Work) message;
System.out.println("work = " + work);
getSender().tell(new Work(work.getStart() + 1));
} else {
unhandled(message);
}
}
}
public class Work {
private final int start;
public Work(int start) {
this.start = start;
}
public int getStart() {
return start;
}
@Override
public String toString() {
return "Work{" +
"start=" + start +
'}';
}
}
Akka 2.4.1 Java API 这里。我现在没有足够的带宽来学习 Scala,所以我会问这里的代码示例也使用 Java API.
我有一个现有的 ActorSystem
,它充满了异步 actors,并且工作得很好。我现在需要像这样在同步上下文中重用这个 actor 系统:
// Groovy pseudo-code; NOT inside the actor system here!
ComputationRequest request = new ComputationRequest(1, 7, true)
MasterActor master = actorSystem.get(...)
ComputationResult result = actorSystem.tell(master, request)
在 Akka 的文档中,我没有看到将请求发送到 actor 系统(从外部)然后检索结果的明确示例。我可以在这里使用 Futures
吗?在 Akka(代码示例)中处理这种模式的标准方法是什么?
ask 模式可以满足您的需求。它期望目标参与者通过 tell
发送到 getSender()
来 "return" 响应。您将得到一个 Future
的响应并可以使用它(如果必须的话,阻止)。
import akka.dispatch.*;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Await;
import scala.concurrent.Promise;
import akka.util.Timeout;
ComputationRequest request = new ComputationRequest(1, 7, true);
MasterActor master = actorSystem.get(...);
Timeout timeout = new Timeout(Duration.create(5, "seconds"));
Future<Object> future = Patterns.ask(master, request, timeout);
ComputationResult result = (ComputationResult) Await.result(future, timeout.duration());
您可以将包装在 actor 中的回调注入 akka 系统,并将其用作 "sender" 和 tell 机制。完整示例:
import akka.actor.*;
import akka.dispatch.Await;
import akka.dispatch.Future;
import akka.pattern.Patterns;
import akka.util.Timeout;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
public class Main {
public static void main(String[] args) throws Exception {
// init your akka system
final ActorSystem system = ActorSystem.create("Cambria");
final ActorRef worker = system.actorOf(new Props(Worker.class), "worker");
worker.tell(new Work(10));
// make callback
final Consumer<Object> callback = a -> System.out.println("from the other side: " + a);
// wrap call back into sub-actor
class TheSpy extends UntypedActor {
@Override
public void onReceive(final Object message) throws Exception {
callback.accept(message);
}
}
// inject callback into the system
final ActorRef theSpy = system.actorOf(new Props(TheSpy::new), "spy");
// find actor you want to hack
final ActorSelection found = system.actorSelection("/user/worker");
// send it a message and observe using callback)
found.tell(new Work(20), theSpy);
final Timeout timeout = new Timeout(5, TimeUnit.SECONDS);
final Future<Object> future = Patterns.ask(worker, new Work(30), timeout);
final Work result = (Work) Await.result(future, timeout.duration());
System.out.println(result);
system.shutdown();
system.awaitTermination();
}
}
public class Worker extends UntypedActor {
public void onReceive(Object message) {
if (message instanceof Work) {
Work work = (Work) message;
System.out.println("work = " + work);
getSender().tell(new Work(work.getStart() + 1));
} else {
unhandled(message);
}
}
}
public class Work {
private final int start;
public Work(int start) {
this.start = start;
}
public int getStart() {
return start;
}
@Override
public String toString() {
return "Work{" +
"start=" + start +
'}';
}
}