如何对演员集合进行查询

How to do queries on a collection of actors

我有一个目前接受 commands/messages 的演员系统。这些 actor 的状态被持久化 Akka.Persistance。我们现在要为这个 actor 系统构建查询系统。基本上我们的问题是我们想要有一种方法来获得这些特定参与者的所有状态的 aggregate/list。虽然我没有严格遵守 CQRS 模式,但我认为这可能是一种很好的解决方法。

我最初的想法是让一个用于查询的 actor 将正在执行 "data writes" 的其他 actor 的状态聚合作为其状态的一部分。为此,这个 actor 将订阅它感兴趣的 actor,这些 actor 只会在它们经历某种状态更改时向查询 actor 发送它们的状态。这是解决这个问题的方法吗?有更好的方法吗?

对于实施此类模式,我的建议是在此处为您的演员组合使用 pub-sub 和 push-and-pull 消息传递。

对于每个 "aggregate," 这个演员应该能够订阅来自您要查询的个人 child 个演员的事件。每当 child 的状态发生变化时,一条消息就会被推送到所有订阅的聚合中,并且每个聚合的状态都会自动更新。

当一个新的聚合上线并需要检索它错过的状态(从它存在之前)它应该能够从每个 child 中提取当前状态并使用它来构建其当前状态,使用children 的增量更新将继续保持其对 children 状态的聚合视图一致。

这是我用于此类工作的模式,它在本地开箱即用。通过网络,您可能必须确保可交付性保证,这通常很容易做到。你可以在那里阅读更多关于如何做到这一点的信息:https://petabridge.com/blog/akkadotnet-at-least-once-message-delivery/

一些 Akka.Persistence 后端(即那些与 SQL 一起工作的后端)也实现了称为 Akka.Persistence.Query 的东西。它允许您订阅生成的事件流,并将其用作 Akka.Streams 语义的来源。

如果您使用 SQL-journals,则需要 Akka.Persistence.Query.SqlAkka.Streams 个包。从那里您可以为特定演员创建实时(这意味着不断更新)事件源,并将其用于您喜欢的任何操作,即打印它们:

using (var system = ActorSystem.Create("system"))
using (var materializer = system.Materializer())
{
    var queries = Sys.ReadJournalFor<SqlReadJournal>(SqlReadJournal.Identifier)

    queries.EventsByPersistenceId("<persistence-id>", 0, long.MaxValue)
        .Select(envelope => envelope.Event)
        .RunForEach(e => Console.WriteLine(e), materializer);
}