Akka.NET 高效查询演员
Akka.NET query actors efficiently
我正在使用 Akka.NET 为生产项目创建概念证明,但我遇到了查询概念理解问题。
情况如下:
CoordinatorActor 有一个包含数千个 Hotel-Actors
.
的列表
我想查询所有 Hotel-Actors
在特定日期有空房的所有酒店。
当然,我可以通过它们进行 foreach 并发送 .Ask<>
请求以获取特定日期。持有所有任务的参考并执行 Task.WhenAll(requests)
。但这感觉有点不自然。
或者,我可以一次向所有酒店(ActorSelection 或路由器)发送一个请求特定日期的广播消息,但我不知道他们什么时候 all用 Tell
消息回复。
有没有人建议如何解决这个问题?
是的,你的感受就在这里。使用 Ask 在 actor 之间进行通信被认为是非常低效的——主要是因为每个 ask 都需要分配单独的消息侦听器。
第一个好问题是:您需要等待他们全部回复吗?也许响应可以在响应到来时发出。
如果您需要在回复之前收集所有数据,您需要有某种方法来计算所有消息以确保其中一些消息是否仍在等待处理 - 在这种情况下使用 ActorSelection
不可行。您需要可以与每条消息相关联的计数器或标识符列表 - 虽然它们甚至可以是普通数字,但通常 IActorRef
s 更容易使用。
下面你可以看到可以为这个特定任务创建的 Aggregator
actor 的简化示例 - 它会自动 return 所有回复,它收到,并在没有更多回复时自行停止要等待的消息或发生超时。
class Aggregator<T> : ReceiveActor
{
private IActorRef originalSender;
private ISet<IActorRef> refs;
public Aggregator(ISet<IActorRef> refs)
{
this.refs = refs;
// this operation will finish after 30 sec of inactivity
// (when no new message arrived)
Context.SetReceiveTimeout(TimeSpan.FromSeconds(30));
ReceiveAny(x =>
{
originalSender = Sender;
foreach (var aref in refs) aref.Tell(x);
Become(Aggregating);
});
}
private void Aggregating()
{
var replies = new List<T>();
// when timeout occurred, we reply with what we've got so far
Receive<ReceiveTimeout>(_ => ReplyAndStop(replies));
Receive<T>(x =>
{
if (refs.Remove(Sender)) replies.Add(x);
if (refs.Count == 0) ReplyAndStop(replies);
});
}
private void ReplyAndStop(List<T> replies)
{
originalSender.Tell(new AggregatedReply<T>(replies));
Context.Stop(Self);
}
}
我正在使用 Akka.NET 为生产项目创建概念证明,但我遇到了查询概念理解问题。
情况如下:
CoordinatorActor 有一个包含数千个 Hotel-Actors
.
我想查询所有 Hotel-Actors
在特定日期有空房的所有酒店。
当然,我可以通过它们进行 foreach 并发送 .Ask<>
请求以获取特定日期。持有所有任务的参考并执行 Task.WhenAll(requests)
。但这感觉有点不自然。
或者,我可以一次向所有酒店(ActorSelection 或路由器)发送一个请求特定日期的广播消息,但我不知道他们什么时候 all用 Tell
消息回复。
有没有人建议如何解决这个问题?
是的,你的感受就在这里。使用 Ask 在 actor 之间进行通信被认为是非常低效的——主要是因为每个 ask 都需要分配单独的消息侦听器。
第一个好问题是:您需要等待他们全部回复吗?也许响应可以在响应到来时发出。
如果您需要在回复之前收集所有数据,您需要有某种方法来计算所有消息以确保其中一些消息是否仍在等待处理 - 在这种情况下使用 ActorSelection
不可行。您需要可以与每条消息相关联的计数器或标识符列表 - 虽然它们甚至可以是普通数字,但通常 IActorRef
s 更容易使用。
下面你可以看到可以为这个特定任务创建的 Aggregator
actor 的简化示例 - 它会自动 return 所有回复,它收到,并在没有更多回复时自行停止要等待的消息或发生超时。
class Aggregator<T> : ReceiveActor
{
private IActorRef originalSender;
private ISet<IActorRef> refs;
public Aggregator(ISet<IActorRef> refs)
{
this.refs = refs;
// this operation will finish after 30 sec of inactivity
// (when no new message arrived)
Context.SetReceiveTimeout(TimeSpan.FromSeconds(30));
ReceiveAny(x =>
{
originalSender = Sender;
foreach (var aref in refs) aref.Tell(x);
Become(Aggregating);
});
}
private void Aggregating()
{
var replies = new List<T>();
// when timeout occurred, we reply with what we've got so far
Receive<ReceiveTimeout>(_ => ReplyAndStop(replies));
Receive<T>(x =>
{
if (refs.Remove(Sender)) replies.Add(x);
if (refs.Count == 0) ReplyAndStop(replies);
});
}
private void ReplyAndStop(List<T> replies)
{
originalSender.Tell(new AggregatedReply<T>(replies));
Context.Stop(Self);
}
}