如何对 Akka actor 系统进行 blocking/synchronous 调用?

How to make blocking/synchronous calls into an Akka actor system?

Akka 2.4.1 Java API 这里。我现在没有足够的带宽来学习 Scala,所以我会问这里的代码示例也使用 Java API.

我有一个现有的 ActorSystem,它充满了异步 actors,并且工作得很好。我现在需要像这样在同步上下文中重用这个 actor 系统:

// Groovy pseudo-code; NOT inside the actor system here!
ComputationRequest request = new ComputationRequest(1, 7, true)
MasterActor master = actorSystem.get(...)
ComputationResult result = actorSystem.tell(master, request)

在 Akka 的文档中,我没有看到将请求发送到 actor 系统(从外部)然后检索结果的明确示例。我可以在这里使用 Futures 吗?在 Akka(代码示例)中处理这种模式的标准方法是什么?

ask 模式可以满足您的需求。它期望目标参与者通过 tell 发送到 getSender() 来 "return" 响应。您将得到一个 Future 的响应并可以使用它(如果必须的话,阻止)。

import akka.dispatch.*;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Await;
import scala.concurrent.Promise;
import akka.util.Timeout;


ComputationRequest request = new ComputationRequest(1, 7, true);
MasterActor master = actorSystem.get(...);

Timeout timeout = new Timeout(Duration.create(5, "seconds"));
Future<Object> future = Patterns.ask(master, request, timeout);
ComputationResult result = (ComputationResult) Await.result(future, timeout.duration());

您可以将包装在 actor 中的回调注入 akka 系统,并将其用作 "sender" 和 tell 机制。完整示例:

import akka.actor.*;
import akka.dispatch.Await;
import akka.dispatch.Future;
import akka.pattern.Patterns;
import akka.util.Timeout;

import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

public class Main {

    public static void main(String[] args) throws Exception {

        // init your akka system
        final ActorSystem system = ActorSystem.create("Cambria");
        final ActorRef worker = system.actorOf(new Props(Worker.class), "worker");
        worker.tell(new Work(10));

        // make callback
        final Consumer<Object> callback = a -> System.out.println("from the other side: " + a);

        // wrap call back into sub-actor
        class TheSpy extends UntypedActor {
            @Override
            public void onReceive(final Object message) throws Exception {
                callback.accept(message);
            }
        }

        // inject callback into the system
        final ActorRef theSpy = system.actorOf(new Props(TheSpy::new), "spy");

        // find actor you want to hack
        final ActorSelection found = system.actorSelection("/user/worker");
        // send it a message and observe using callback)
        found.tell(new Work(20), theSpy);

        final Timeout timeout = new Timeout(5, TimeUnit.SECONDS);
        final Future<Object> future = Patterns.ask(worker, new Work(30), timeout);
        final Work result = (Work) Await.result(future, timeout.duration());
        System.out.println(result);

        system.shutdown();
        system.awaitTermination();
    }
}

public class Worker extends UntypedActor {
    public void onReceive(Object message) {
        if (message instanceof Work) {
            Work work = (Work) message;
            System.out.println("work = " + work);
            getSender().tell(new Work(work.getStart() + 1));
        } else {
            unhandled(message);
        }
    }
}

public class Work {
    private final int start;

    public Work(int start) {
        this.start = start;
    }

    public int getStart() {
        return start;
    }

    @Override
    public String toString() {
        return "Work{" +
                "start=" + start +
                '}';
    }
}