检索一个 Akka actor 或如果它不存在则创建它

Retrieve an Akka actor or create it if it does not exist

我正在开发一个应用程序,它创建一些 Akka 参与者来管理和处理来自 Kafka 主题的消息。具有相同密钥的消息由相同的参与者处理。我也是用message key来命名对应的actor

当从主题中读取一条新消息时,我不知道id等于消息键的actor是否已经被actor系统创建了.因此,我尝试使用它的名字来解析 actor,如果它不存在,我就创建它。我需要管理关于参与者解析的并发性。所以有可能不止一个客户端询问 actor 系统 actor 是否存在。

我现在使用的代码如下:

private CompletableFuture<ActorRef> getActor(String uuid) {
    return system.actorSelection(String.format("/user/%s", uuid))
                 .resolveOne(Duration.ofMillis(1000))
                 .toCompletableFuture()
                 .exceptionally(ex -> 
                     system.actorOf(Props.create(MyActor.class, uuid), uuid))
                 .exceptionally(ex -> {
                     try {
                         return system.actorSelection(String.format("/user/%s",uuid)).resolveOne(Duration.ofMillis(1000)).toCompletableFuture().get();
                     } catch (InterruptedException | ExecutionException e) {
                         throw new RuntimeException(e);
                     }
                 });
}

以上代码没有优化,异常处理可以做得更好

但是,在 Akka 中是否有更惯用的方法来解析 actor,或者如果它不存在则创建它?我错过了什么吗?

考虑创建一个 actor,将消息 ID 映射到 ActorRefs 作为其状态。这个 "receptionist" actor 将处理所有获取消息处理 actor 的请求。当接待员收到对演员的请求(该请求将包含消息 ID)时,它会尝试在其映射中查找关联的演员:如果找到这样的演员,它 returns ActorRef发件人;否则它会创建一个新的处理 actor,将该 actor 添加到其映射中,并且 returns 该 actor 引用发送者。

Jeffrey Chung 的回答确实是Akka 方式。这种方法的缺点是性能低下。最高效的解决方案是使用 Java 的 ConcurrentHashMap.computeIfAbsent() 方法。

我会考虑使用 akka-clusterakka-cluster-sharding。首先,这为您提供了 吞吐量 以及可靠性。但是,它也会让系统管理 'entity' 演员的创建。

但你必须改变与那些演员交谈的方式。您创建了一个 ShardRegion actor 来处理所有消息:

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.sharding.ClusterSharding;
import akka.cluster.sharding.ClusterShardingSettings;
import akka.cluster.sharding.ShardRegion;
import akka.event.Logging;
import akka.event.LoggingAdapter;


public class MyEventReceiver extends AbstractActor {

    private final ActorRef shardRegion;

    public static Props props() {
        return Props.create(MyEventReceiver.class, MyEventReceiver::new);
    }

    static ShardRegion.MessageExtractor messageExtractor 
      = new ShardRegion.HashCodeMessageExtractor(100) {
            // using the supplied hash code extractor to shard
            // the actors based on the hashcode of the entityid

        @Override
        public String entityId(Object message) {
            if (message instanceof EventInput) {
                return ((EventInput) message).uuid().toString();
            }
            return null;
        }

        @Override
        public Object entityMessage(Object message) {
            if (message instanceof EventInput) {
                return message;
            }
            return message; // I don't know why they do this it's in the sample
        }
    };


    public MyEventReceiver() {
        ActorSystem system = getContext().getSystem();
        ClusterShardingSettings settings =
           ClusterShardingSettings.create(system);
        // this is setup for the money shot
        shardRegion = ClusterSharding.get(system)
                .start("EventShardingSytem",
                        Props.create(EventActor.class),
                        settings,
                        messageExtractor);
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder().match(
                EventInput.class,
                e -> {
                    log.info("Got an event with UUID {} forwarding ... ",
                            e.uuid());
                    // the money shot
                    deviceRegion.tell(e, getSender());
                }
        ).build();
    }
}

所以这个 Actor MyEventReceiver 在你集群的所有节点上运行,并封装了 shardRegion Actor。您不再直接向您的 EventActor 发送消息,而是使用 MyEventReceiverdeviceRegion Actor,您使用分片系统跟踪集群中的哪个节点 特定 EventActor 继续生活。如果 none 之前已经创建过,它将创建一个,如果已经创建过,它将发送消息。每个 EventActor 必须有一个唯一的 id:它是从 message 中提取的(所以 UUID 对此非常好,但它可能是其他一些 id,例如 customerID、orderID 或其他任何东西,只要它对于您要处理它的 Actor 实例而言是唯一的)。

(我省略了 EventActor 代码,否则它是一个非常普通的 Actor,具体取决于您使用它做什么,'magic' 在上面的代码中)。

分片系统会根据您选择的算法自动知道创建 EventActor 并将其分配给分片(在这种特殊情况下,它基于 hashCode唯一 ID,这是我用过的所有 ID)。此外,对于任何给定的唯一 ID,您只能 保证 一个 演员。消息被透明地路由到正确的节点和分片,无论它在哪里;从发送它的任何节点和分片。

Akka 站点和文档中有更多信息和示例代码。

这是确保相同的 Entity/Actor 始终处理发送给它的消息的非常棒的方法。集群和分片会自动处理正确分配 Actors 和故障转移等(如果 Actor 有一堆与之关联的严格状态,则必须添加 akka-persistence 以获得钝化、再水化和故障转移(必须恢复))。