Akka.net DDD 聚合协调器(存储库)的钝化
Akka.net Passivation for DDD Aggregate Coordinator (repository)
我正在使用 Akka.net 并希望实现 'DDD repository' 的响应式等效项,根据我在此处看到的内容 http://qnalist.com/questions/5585484/ddd-eventsourcing-with-akka-persistence and https://gitter.im/petabridge/akka-bootcamp/archives/2015/06/25
我理解有一个协调器的想法,该协调器根据一些实时 in-memory 计数或一些经过的时间在内存中保留多个演员。
作为总结(基于上面的链接)我正在尝试:
- 创建一个 returns 根据请求聚合的聚合协调器(针对每种参与者类型)。
- 每个聚合使用 Context.SetReceiveTimeout 方法来识别它是否在一段时间内未被使用。如果是这样,它将收到 ReceiveTimeout 消息。
- 收到超时消息后,Child 将向协调器发送回 Passivate 消息(这反过来会导致协调器关闭 child)。
- 当 child 被关闭时,发送到 child 的所有消息都被协调器拦截并缓冲。
- 一旦 child 的关闭被确认(在协调器中),如果有缓冲的消息 child 它被重新创建并且所有消息都刷新到重新创建的 child .
如何拦截试图发送到 child 的消息(第 4 步)并将它们路由到 parent?或者换句话说,我希望 child 在发送 Passivate 消息时也说 "hey don't send me anymore messages, send them to my parent instead".
这将节省我通过协调器路由所有内容(或者我以错误的方式处理它并且不可能进行消息拦截,而是应该通过 parent 代理所有内容) ?
我有我的消息合同:
public class GetActor
{
public readonly string Identity;
public GetActor(string identity)
{
Identity = identity;
}
}
public class GetActorReply
{
public readonly IActorRef ActorRef;
public GetActorReply(IActorRef actorRef)
{
ActorRef = actorRef;
}
}
public class Passivate // sent from child aggregate to parent coordinator
{
}
协调器class,对于每个聚合类型都有一个唯一的实例:
public class ActorLifetimeCoordinator r<T> : ReceiveActor where T : ActorBase
{
protected Dictionary<Identity,IActorRef> Actors = new Dictionary<Identity, IActorRef>();
protected Dictionary<Identity, List<object>> BufferedMsgs = new Dictionary<Identity, List<object>>();
public ActorLifetimeCoordinator()
{
Receive<GetActor>(message =>
{
var actor = GetActor(message.Identity);
Sender.Tell(new GetActorReply(actor), Self); // reply with the retrieved actor
});
Receive<Passivate>(message =>
{
var actorToUnload = Context.Sender;
var task = actorToUnload.GracefulStop(TimeSpan.FromSeconds(10));
// the time between the above and below lines, we need to intercept messages to the child that is being
// removed from memory - how to do this?
task.Wait(); // dont block thread, use pipeto instead?
});
}
protected IActorRef GetActor(string identity)
{
IActorRef value;
return Actors.TryGetValue(identity, out value)
? value : Context.System.ActorOf(Props.Create<T>(identity));
}
}
聚合基数class,所有聚合均源自:
public abstract class AggregateRoot : ReceivePersistentActor
{
private readonly DispatchByReflectionStrategy _dispatchStrategy
= new DispatchByReflectionStrategy("When");
protected AggregateRoot(Identity identity)
{
PersistenceId = Context.Parent.Path.Name + "/" + Self.Path.Name + "/" + identity;
Recover((Action<IDomainEvent>)Dispatch);
Command<ReceiveTimeout>(message =>
{
Context.Parent.Tell(new Passivate());
});
Context.SetReceiveTimeout(TimeSpan.FromMinutes(5));
}
public override string PersistenceId { get; }
private void Dispatch(IDomainEvent domainEvent)
{
_dispatchStrategy.Dispatch(this, domainEvent);
}
protected void Emit(IDomainEvent domainEvent)
{
Persist(domainEvent, success =>
{
Dispatch(domainEvent);
});
}
}
这里最简单(但不是最简单)的选择是使用 Akka.Cluster.Sharding 模块,它涵盖了协调器模式的领域,支持参与者在集群中的分布和平衡。
如果您选择不需要它,很遗憾,您需要通过协调器传递消息 - 消息本身需要提供用于确定收件人的标识符。否则你可能最终会向死去的演员发送消息。
我正在使用 Akka.net 并希望实现 'DDD repository' 的响应式等效项,根据我在此处看到的内容 http://qnalist.com/questions/5585484/ddd-eventsourcing-with-akka-persistence and https://gitter.im/petabridge/akka-bootcamp/archives/2015/06/25
我理解有一个协调器的想法,该协调器根据一些实时 in-memory 计数或一些经过的时间在内存中保留多个演员。
作为总结(基于上面的链接)我正在尝试:
- 创建一个 returns 根据请求聚合的聚合协调器(针对每种参与者类型)。
- 每个聚合使用 Context.SetReceiveTimeout 方法来识别它是否在一段时间内未被使用。如果是这样,它将收到 ReceiveTimeout 消息。
- 收到超时消息后,Child 将向协调器发送回 Passivate 消息(这反过来会导致协调器关闭 child)。
- 当 child 被关闭时,发送到 child 的所有消息都被协调器拦截并缓冲。
- 一旦 child 的关闭被确认(在协调器中),如果有缓冲的消息 child 它被重新创建并且所有消息都刷新到重新创建的 child .
如何拦截试图发送到 child 的消息(第 4 步)并将它们路由到 parent?或者换句话说,我希望 child 在发送 Passivate 消息时也说 "hey don't send me anymore messages, send them to my parent instead".
这将节省我通过协调器路由所有内容(或者我以错误的方式处理它并且不可能进行消息拦截,而是应该通过 parent 代理所有内容) ?
我有我的消息合同:
public class GetActor
{
public readonly string Identity;
public GetActor(string identity)
{
Identity = identity;
}
}
public class GetActorReply
{
public readonly IActorRef ActorRef;
public GetActorReply(IActorRef actorRef)
{
ActorRef = actorRef;
}
}
public class Passivate // sent from child aggregate to parent coordinator
{
}
协调器class,对于每个聚合类型都有一个唯一的实例:
public class ActorLifetimeCoordinator r<T> : ReceiveActor where T : ActorBase
{
protected Dictionary<Identity,IActorRef> Actors = new Dictionary<Identity, IActorRef>();
protected Dictionary<Identity, List<object>> BufferedMsgs = new Dictionary<Identity, List<object>>();
public ActorLifetimeCoordinator()
{
Receive<GetActor>(message =>
{
var actor = GetActor(message.Identity);
Sender.Tell(new GetActorReply(actor), Self); // reply with the retrieved actor
});
Receive<Passivate>(message =>
{
var actorToUnload = Context.Sender;
var task = actorToUnload.GracefulStop(TimeSpan.FromSeconds(10));
// the time between the above and below lines, we need to intercept messages to the child that is being
// removed from memory - how to do this?
task.Wait(); // dont block thread, use pipeto instead?
});
}
protected IActorRef GetActor(string identity)
{
IActorRef value;
return Actors.TryGetValue(identity, out value)
? value : Context.System.ActorOf(Props.Create<T>(identity));
}
}
聚合基数class,所有聚合均源自:
public abstract class AggregateRoot : ReceivePersistentActor
{
private readonly DispatchByReflectionStrategy _dispatchStrategy
= new DispatchByReflectionStrategy("When");
protected AggregateRoot(Identity identity)
{
PersistenceId = Context.Parent.Path.Name + "/" + Self.Path.Name + "/" + identity;
Recover((Action<IDomainEvent>)Dispatch);
Command<ReceiveTimeout>(message =>
{
Context.Parent.Tell(new Passivate());
});
Context.SetReceiveTimeout(TimeSpan.FromMinutes(5));
}
public override string PersistenceId { get; }
private void Dispatch(IDomainEvent domainEvent)
{
_dispatchStrategy.Dispatch(this, domainEvent);
}
protected void Emit(IDomainEvent domainEvent)
{
Persist(domainEvent, success =>
{
Dispatch(domainEvent);
});
}
}
这里最简单(但不是最简单)的选择是使用 Akka.Cluster.Sharding 模块,它涵盖了协调器模式的领域,支持参与者在集群中的分布和平衡。
如果您选择不需要它,很遗憾,您需要通过协调器传递消息 - 消息本身需要提供用于确定收件人的标识符。否则你可能最终会向死去的演员发送消息。