检索一个 Akka actor 或如果它不存在则创建它
Retrieve an Akka actor or create it if it does not exist
我正在开发一个应用程序,它创建一些 Akka 参与者来管理和处理来自 Kafka 主题的消息。具有相同密钥的消息由相同的参与者处理。我也是用message key来命名对应的actor
当从主题中读取一条新消息时,我不知道id等于消息键的actor是否已经被actor系统创建了.因此,我尝试使用它的名字来解析 actor,如果它不存在,我就创建它。我需要管理关于参与者解析的并发性。所以有可能不止一个客户端询问 actor 系统 actor 是否存在。
我现在使用的代码如下:
private CompletableFuture<ActorRef> getActor(String uuid) {
return system.actorSelection(String.format("/user/%s", uuid))
.resolveOne(Duration.ofMillis(1000))
.toCompletableFuture()
.exceptionally(ex ->
system.actorOf(Props.create(MyActor.class, uuid), uuid))
.exceptionally(ex -> {
try {
return system.actorSelection(String.format("/user/%s",uuid)).resolveOne(Duration.ofMillis(1000)).toCompletableFuture().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}
以上代码没有优化,异常处理可以做得更好
但是,在 Akka 中是否有更惯用的方法来解析 actor,或者如果它不存在则创建它?我错过了什么吗?
考虑创建一个 actor,将消息 ID 映射到 ActorRef
s 作为其状态。这个 "receptionist" actor 将处理所有获取消息处理 actor 的请求。当接待员收到对演员的请求(该请求将包含消息 ID)时,它会尝试在其映射中查找关联的演员:如果找到这样的演员,它 returns ActorRef
发件人;否则它会创建一个新的处理 actor,将该 actor 添加到其映射中,并且 returns 该 actor 引用发送者。
Jeffrey Chung 的回答确实是Akka 方式。这种方法的缺点是性能低下。最高效的解决方案是使用 Java 的 ConcurrentHashMap.computeIfAbsent() 方法。
我会考虑使用 akka-cluster
和 akka-cluster-sharding
。首先,这为您提供了 吞吐量 以及可靠性。但是,它也会让系统管理 'entity' 演员的创建。
但你必须改变与那些演员交谈的方式。您创建了一个 ShardRegion
actor 来处理所有消息:
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.sharding.ClusterSharding;
import akka.cluster.sharding.ClusterShardingSettings;
import akka.cluster.sharding.ShardRegion;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class MyEventReceiver extends AbstractActor {
private final ActorRef shardRegion;
public static Props props() {
return Props.create(MyEventReceiver.class, MyEventReceiver::new);
}
static ShardRegion.MessageExtractor messageExtractor
= new ShardRegion.HashCodeMessageExtractor(100) {
// using the supplied hash code extractor to shard
// the actors based on the hashcode of the entityid
@Override
public String entityId(Object message) {
if (message instanceof EventInput) {
return ((EventInput) message).uuid().toString();
}
return null;
}
@Override
public Object entityMessage(Object message) {
if (message instanceof EventInput) {
return message;
}
return message; // I don't know why they do this it's in the sample
}
};
public MyEventReceiver() {
ActorSystem system = getContext().getSystem();
ClusterShardingSettings settings =
ClusterShardingSettings.create(system);
// this is setup for the money shot
shardRegion = ClusterSharding.get(system)
.start("EventShardingSytem",
Props.create(EventActor.class),
settings,
messageExtractor);
}
@Override
public Receive createReceive() {
return receiveBuilder().match(
EventInput.class,
e -> {
log.info("Got an event with UUID {} forwarding ... ",
e.uuid());
// the money shot
deviceRegion.tell(e, getSender());
}
).build();
}
}
所以这个 Actor MyEventReceiver
在你集群的所有节点上运行,并封装了 shardRegion
Actor。您不再直接向您的 EventActor
发送消息,而是使用 MyEventReceiver
和 deviceRegion
Actor,您使用分片系统跟踪集群中的哪个节点 特定 EventActor
继续生活。如果 none 之前已经创建过,它将创建一个,如果已经创建过,它将发送消息。每个 EventActor
必须有一个唯一的 id:它是从 message 中提取的(所以 UUID
对此非常好,但它可能是其他一些 id,例如 customerID、orderID 或其他任何东西,只要它对于您要处理它的 Actor 实例而言是唯一的)。
(我省略了 EventActor
代码,否则它是一个非常普通的 Actor,具体取决于您使用它做什么,'magic' 在上面的代码中)。
分片系统会根据您选择的算法自动知道创建 EventActor
并将其分配给分片(在这种特殊情况下,它基于 hashCode
唯一 ID,这是我用过的所有 ID)。此外,对于任何给定的唯一 ID,您只能 保证 一个 演员。消息被透明地路由到正确的节点和分片,无论它在哪里;从发送它的任何节点和分片。
Akka 站点和文档中有更多信息和示例代码。
这是确保相同的 Entity/Actor 始终处理发送给它的消息的非常棒的方法。集群和分片会自动处理正确分配 Actors 和故障转移等(如果 Actor 有一堆与之关联的严格状态,则必须添加 akka-persistence
以获得钝化、再水化和故障转移(必须恢复))。
我正在开发一个应用程序,它创建一些 Akka 参与者来管理和处理来自 Kafka 主题的消息。具有相同密钥的消息由相同的参与者处理。我也是用message key来命名对应的actor
当从主题中读取一条新消息时,我不知道id等于消息键的actor是否已经被actor系统创建了.因此,我尝试使用它的名字来解析 actor,如果它不存在,我就创建它。我需要管理关于参与者解析的并发性。所以有可能不止一个客户端询问 actor 系统 actor 是否存在。
我现在使用的代码如下:
private CompletableFuture<ActorRef> getActor(String uuid) {
return system.actorSelection(String.format("/user/%s", uuid))
.resolveOne(Duration.ofMillis(1000))
.toCompletableFuture()
.exceptionally(ex ->
system.actorOf(Props.create(MyActor.class, uuid), uuid))
.exceptionally(ex -> {
try {
return system.actorSelection(String.format("/user/%s",uuid)).resolveOne(Duration.ofMillis(1000)).toCompletableFuture().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}
以上代码没有优化,异常处理可以做得更好
但是,在 Akka 中是否有更惯用的方法来解析 actor,或者如果它不存在则创建它?我错过了什么吗?
考虑创建一个 actor,将消息 ID 映射到 ActorRef
s 作为其状态。这个 "receptionist" actor 将处理所有获取消息处理 actor 的请求。当接待员收到对演员的请求(该请求将包含消息 ID)时,它会尝试在其映射中查找关联的演员:如果找到这样的演员,它 returns ActorRef
发件人;否则它会创建一个新的处理 actor,将该 actor 添加到其映射中,并且 returns 该 actor 引用发送者。
Jeffrey Chung 的回答确实是Akka 方式。这种方法的缺点是性能低下。最高效的解决方案是使用 Java 的 ConcurrentHashMap.computeIfAbsent() 方法。
我会考虑使用 akka-cluster
和 akka-cluster-sharding
。首先,这为您提供了 吞吐量 以及可靠性。但是,它也会让系统管理 'entity' 演员的创建。
但你必须改变与那些演员交谈的方式。您创建了一个 ShardRegion
actor 来处理所有消息:
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.sharding.ClusterSharding;
import akka.cluster.sharding.ClusterShardingSettings;
import akka.cluster.sharding.ShardRegion;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class MyEventReceiver extends AbstractActor {
private final ActorRef shardRegion;
public static Props props() {
return Props.create(MyEventReceiver.class, MyEventReceiver::new);
}
static ShardRegion.MessageExtractor messageExtractor
= new ShardRegion.HashCodeMessageExtractor(100) {
// using the supplied hash code extractor to shard
// the actors based on the hashcode of the entityid
@Override
public String entityId(Object message) {
if (message instanceof EventInput) {
return ((EventInput) message).uuid().toString();
}
return null;
}
@Override
public Object entityMessage(Object message) {
if (message instanceof EventInput) {
return message;
}
return message; // I don't know why they do this it's in the sample
}
};
public MyEventReceiver() {
ActorSystem system = getContext().getSystem();
ClusterShardingSettings settings =
ClusterShardingSettings.create(system);
// this is setup for the money shot
shardRegion = ClusterSharding.get(system)
.start("EventShardingSytem",
Props.create(EventActor.class),
settings,
messageExtractor);
}
@Override
public Receive createReceive() {
return receiveBuilder().match(
EventInput.class,
e -> {
log.info("Got an event with UUID {} forwarding ... ",
e.uuid());
// the money shot
deviceRegion.tell(e, getSender());
}
).build();
}
}
所以这个 Actor MyEventReceiver
在你集群的所有节点上运行,并封装了 shardRegion
Actor。您不再直接向您的 EventActor
发送消息,而是使用 MyEventReceiver
和 deviceRegion
Actor,您使用分片系统跟踪集群中的哪个节点 特定 EventActor
继续生活。如果 none 之前已经创建过,它将创建一个,如果已经创建过,它将发送消息。每个 EventActor
必须有一个唯一的 id:它是从 message 中提取的(所以 UUID
对此非常好,但它可能是其他一些 id,例如 customerID、orderID 或其他任何东西,只要它对于您要处理它的 Actor 实例而言是唯一的)。
(我省略了 EventActor
代码,否则它是一个非常普通的 Actor,具体取决于您使用它做什么,'magic' 在上面的代码中)。
分片系统会根据您选择的算法自动知道创建 EventActor
并将其分配给分片(在这种特殊情况下,它基于 hashCode
唯一 ID,这是我用过的所有 ID)。此外,对于任何给定的唯一 ID,您只能 保证 一个 演员。消息被透明地路由到正确的节点和分片,无论它在哪里;从发送它的任何节点和分片。
Akka 站点和文档中有更多信息和示例代码。
这是确保相同的 Entity/Actor 始终处理发送给它的消息的非常棒的方法。集群和分片会自动处理正确分配 Actors 和故障转移等(如果 Actor 有一堆与之关联的严格状态,则必须添加 akka-persistence
以获得钝化、再水化和故障转移(必须恢复))。