在当前 actor 进程中使用来自 actor 的响应

Using a response from an actor in current actor process

我对如何以非阻塞方式解决这种情况感到困惑。

考虑两个演员 Actor1Actor2

Actor1

之内
Map<Int, Int> foo() {
     List<String> finalList = foo_2();
     Map<Int, Int> finalMap = // do stuff with finalList to get Map<Int, Int>;

     return finalMap;
}

List<String> foo_2() {
    
     CompletableFuture<List<String>> Querylist = ask(Actor2)
     Querylist.get();
     
     return QueryList;
}

目前在 foo_2 内,Querylist.get() 是一个阻塞调用。我想以某种方式以非阻塞方式解决这个问题。我在 Actor1 中为 Actor2 创建了一个消息适配器,因此 Actor2 发送的任何消息都将由 Actor1.

处理

我使用了下面的方法来修改阻塞调用

Map<Int, Int> foo() {
     CompletionStage<List<String>> finalList = foo_2();
     finalList.whenComplete(
        // what to do here? 
     )
     // Map<Int, Int> finalMap = // do stuff with finalList to get Map<Int, Int>;

     return finalMap;
}

CompletionStage<List<String>> foo_2() {
    
     CompletionStage<List<String>> Querylist = ask(Actor2)
     
     
     return QueryList;
}

我不确定如何正确使用 CompletionStage 构造来获得与阻塞 futures.get() 调用相同的结果。

如果您使用的是 Akka Typed(从标签中隐含),则根本不需要 future 或消息适配器。只需使用 ActorContext.ask.

参见documentation for request-response with ask between two actors

大体上,您将摆脱 foofoo_2 方法,将您设置的消息适配器移动到 ActorContext.ask 调用中,并替换其中的实例您之前曾通过 ActorContext.ask 呼叫 foo。如果您的参与者发送到导致请求的消息的回复取决于对请求的响应,那么一个好的做法是将所需的状态部分嵌入到适配器生成的消息中。

您可以使用 pipeToSelf,参见 https://doc.akka.io/docs/akka/current/typed/interaction-patterns.html#send-future-result-to-self,将请求的结果发送给 actor 本身。与其尝试直接在 foo() 中获取 finalList 的值,这只有通过阻塞 get 才有可能,foo() 的结果可以发送给 actor 本身,在这种情况下,您像处理任何其他消息一样处理它。最好为此创建一个特定的消息类型。

您还应该查看 CompletionStage 方法,最重要的是 thenApply (https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html#thenApply-java.util.function.Function-),它可以转换结果,例如创建 Map 来自 finalList,例如来自 MapMapMessage。然后,您将像处理 actor 中的任何其他消息一样处理 MapMessage