例如:在询问时丢失了对 `child.path.name` 的引用

Akka: lost reference for `child.path.name` on ask

我正在尝试实现 Manning 的 "Akka in Action" 书中 "Up and Running" example 的 Java 版本。它是一个基于 Actor 模型的简单 Http 服务器,用于保存(仅在内存中)和检索一些事件。保存事件没有问题。但是在查询我的 actor 系统的事件(所有事件)时我确实遇到了问题。

这是 BoxOffice - parent actor for all [=16= 的相关代码(我用三点代替了我认为与我的问题无关的代码)代码]s(后来负责管理每个事件的状态)。

public class BoxOffice extends AbstractActor {

    ...
    private Timeout timeout;
    final static String NAME = "boxOffice";

    //create child actors
    private ActorRef createTicketSeller(String name) {
        return getContext().actorOf(TicketSeller.props(name));
    }

    public BoxOffice(Timeout timeout) {
        this.timeout = timeout;
    }

    //the only method of an actor
    @Override
    public Receive createReceive() {
        return receiveBuilder()
                ...
                ...
                .match(GetEvent.class, this::receiveMsgGetEvent)
                .match(GetEvents.class, this::receiveMsgGetEvents)
                ...
                .build();
    }

    ...

    private void receiveMsgGetEvent(GetEvent getEvent) {
        Optional<ActorRef> maybeChild = getChildByName(getEvent.getName());
        log.info(String.format("Asking for event %s. Child is present: %s", getEvent.getName(), maybeChild.isPresent()));
        OptionalConsumer.of(maybeChild)
                .ifPresent(child -> child.forward(new TicketSeller.GetEvent(), getContext()))
                .ifNotPresent(() -> getSender().tell(Optional.empty(), getSelf()));
    }

    private void receiveMsgGetEvents(GetEvents getEvents) {
        //ask self() for each of the passed-in event
        List<CompletableFuture<Optional<Event>>> listFutureMaybeEvent =
                allChildrenStream()
                .map(child ->
                        ask(getSelf(), new GetEvent(child.path().name()), timeout)
                        .thenApply(obj -> (Optional<Event>) obj)
                        .toCompletableFuture())
                .collect(toList());

        CompletableFuture<Events> eventsFuture = toFutureEvents(listFutureMaybeEvent);
        pipe(eventsFuture, getContext().dispatcher()).to(sender());
    }

    private Stream<ActorRef> allChildrenStream() {
        return StreamSupport.stream(getContext().getChildren().spliterator(), false);
    }

    ...

    private CompletableFuture<Events> toFutureEvents(List<CompletableFuture<Optional<Event>>> futurePossibleEvents) {
        List<Event> events = futurePossibleEvents.stream()
                .map(CompletableFuture::join)
                .filter(Optional::isPresent)
                .map(Optional::get)
                .collect(toList());
        return CompletableFuture.supplyAsync(() -> new Events(events));
    }

    ...

    private Optional<ActorRef> getChildByName(String name) {
        return getContext().findChild(name);
    }

    static Props props(Timeout timeout) {
        return Props.create(BoxOffice.class, () -> new BoxOffice(timeout));
    }

基本上发生的事情是,在 receiveMsgGetEvents 中,我向 self 发送了一条包含 child 名称 child.path.name 的消息。但是,当我收到该消息时(分别在 receiveMsgGetEvent 中),无法通过该名称找到 child 演员:

INFO  [BoxOffice]: Asking for event $a. Child is present: false

同样值得注意的是,发送 GetEvent 和被同一个演员接收之间需要很长时间(大约几秒,但我感觉不到 20)。

问题可能是由于我的 CompletableFutures 操作造成的,但我已尝试重现 scala 等效代码。

上面的信息日志以及这条消息:

INFO  [DeadLetterActorRef]: Message [java.util.Optional] from Actor[akka://mycompanyAkkaDemo/user/boxOffice#1554115585] to Actor[akka://mycompanyAkkaDemo/deadLetters] was not delivered. [1] dead letters encountered. This logging...

在配置超时(20 秒)后打印的堆栈跟踪之后打印:

ERROR [ActorSystemImpl]: Error during processing of request: 'Ask timed out on [Actor[akka://mycompanyAkkaDemo/user/boxOffice#1554115585]] after [20000 ms]. Sender[null] sent message of type "com.mycompany.demo.messages.boxoffice.GetEvents".'. Completing with 500 Internal Server Error response. To change default exception handling behavior, provide a custom ExceptionHandler.
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://mycompanyAkkaDemo/user/boxOffice#1554115585]] after [20000 ms]. Sender[null] sent message of type "com.mycompany.demo.messages.boxoffice.GetEvents".
    at akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout(AskSupport.scala:595)
    at akka.pattern.PromiseActorRef$.$anonfun$apply(AskSupport.scala:605)
    at akka.actor.Scheduler$$anon.run(Scheduler.scala:140)
    ...
    at java.lang.Thread.run(Thread.java:748)
ERROR [OneForOneStrategy]: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://mycompanyAkkaDemo/user/boxOffice#1554115585]] after [20000 ms]. Sender[null] sent message of type "com.mycompany.demo.messages.boxoffice.GetEvent".
java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://mycompanyAkkaDemo/user/boxOffice#1554115585]] after [20000 ms]. Sender[null] sent message of type "com.mycompany.demo.messages.boxoffice.GetEvent".
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
    at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
    ...
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://mycompanyAkkaDemo/user/boxOffice#1554115585]] after [20000 ms]. Sender[null] sent message of type "com.mycompany.demo.messages.boxoffice.GetEvent".
    at akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout(AskSupport.scala:595)
    ... 11 common frames omitted

这里出了什么问题是调度程序阻塞了。

在 JVM 上,由操作系统线程支持的线程,它们在内存和进程调度程序开销方面都很昂贵。 Akka 的优点之一是它允许更有效地使用线程,允许您 运行 在较少数量的线程上使用许多 actor。

这很好,但确实意味着您永远不应该在 actor 中执行阻塞调用。此处的 CompletableFuture::join 调用正在阻塞,这可能是您遇到问题的原因。

通过避免阻塞调用和使用异步 API(例如 CompletableFuture.allOf),您的问题应该会消失。