Akka.net DDD 聚合协调器(存储库)的钝化

Akka.net Passivation for DDD Aggregate Coordinator (repository)

我正在使用 Akka.net 并希望实现 'DDD repository' 的响应式等效项,根据我在此处看到的内容 http://qnalist.com/questions/5585484/ddd-eventsourcing-with-akka-persistence and https://gitter.im/petabridge/akka-bootcamp/archives/2015/06/25

我理解有一个协调器的想法,该协调器根据一些实时 in-memory 计数或一些经过的时间在内存中保留多个演员。

作为总结(基于上面的链接)我正在尝试:

  1. 创建一个 returns 根据请求聚合的聚合协调器(针对每种参与者类型)。
  2. 每个聚合使用 Context.SetReceiveTimeout 方法来识别它是否在一段时间内未被使用。如果是这样,它将收到 ReceiveTimeout 消息。
  3. 收到超时消息后,Child 将向协调器发送回 Passivate 消息(这反过来会导致协调器关闭 child)。
  4. 当 child 被关闭时,发送到 child 的所有消息都被协调器拦截并缓冲。
  5. 一旦 child 的关闭被确认(在协调器中),如果有缓冲的消息 child 它被重新创建并且所有消息都刷新到重新创建的 child .

如何拦截试图发送到 child 的消息(第 4 步)并将它们路由到 parent?或者换句话说,我希望 child 在发送 Passivate 消息时也说 "hey don't send me anymore messages, send them to my parent instead".

这将节省我通过协调器路由所有内容(或者我以错误的方式处理它并且不可能进行消息拦截,而是应该通过 parent 代理所有内容) ?

我有我的消息合同:

public class GetActor
{
    public readonly string Identity;

    public GetActor(string identity)
    {
        Identity = identity;
    }
}

public class GetActorReply
{
    public readonly IActorRef ActorRef;

    public GetActorReply(IActorRef actorRef)
    {
        ActorRef = actorRef;
    }
}

public class Passivate // sent from child aggregate to parent coordinator
{
}

协调器class,对于每个聚合类型都有一个唯一的实例:

public class ActorLifetimeCoordinator r<T> : ReceiveActor where T : ActorBase
{
    protected Dictionary<Identity,IActorRef> Actors = new Dictionary<Identity, IActorRef>();
    protected Dictionary<Identity, List<object>> BufferedMsgs = new Dictionary<Identity, List<object>>();

    public ActorLifetimeCoordinator()
    {
        Receive<GetActor>(message =>
        {
            var actor = GetActor(message.Identity);
            Sender.Tell(new GetActorReply(actor), Self); // reply with the retrieved actor
        });

        Receive<Passivate>(message =>
        {
            var actorToUnload = Context.Sender;
            var task = actorToUnload.GracefulStop(TimeSpan.FromSeconds(10));

            // the time between the above and below lines, we need to intercept messages to the child that is being
            // removed from memory - how to do this?

            task.Wait(); // dont block thread, use pipeto instead?
        });
    }

    protected IActorRef GetActor(string identity)
    {
        IActorRef value;
        return Actors.TryGetValue(identity, out value)
            ? value : Context.System.ActorOf(Props.Create<T>(identity));            
    }
}

聚合基数class,所有聚合均源自:

public abstract class AggregateRoot : ReceivePersistentActor
{
    private readonly DispatchByReflectionStrategy _dispatchStrategy
        = new DispatchByReflectionStrategy("When");      

    protected AggregateRoot(Identity identity)
    {
        PersistenceId = Context.Parent.Path.Name + "/" + Self.Path.Name + "/" + identity;

        Recover((Action<IDomainEvent>)Dispatch);

        Command<ReceiveTimeout>(message =>
        {
            Context.Parent.Tell(new Passivate());    
        });

        Context.SetReceiveTimeout(TimeSpan.FromMinutes(5));
    }

    public override string PersistenceId { get; }

    private void Dispatch(IDomainEvent domainEvent)
    {
        _dispatchStrategy.Dispatch(this, domainEvent);
    }

    protected void Emit(IDomainEvent domainEvent)
    {
        Persist(domainEvent, success =>
        {
            Dispatch(domainEvent);
        });
    }
}

这里最简单(但不是最简单)的选择是使用 Akka.Cluster.Sharding 模块,它涵盖了协调器模式的领域,支持参与者在集群中的分布和平衡。

如果您选择不需要它,很遗憾,您需要通过协调器传递消息 - 消息本身需要提供用于确定收件人的标识符。否则你可能最终会向死去的演员发送消息。