序列化和反序列化域事件以在通用实现中持久化并从事件存储中检索

Serialize and Deserialize domain events to persist and retrieve from Event Store in generic implementation

我正在将 DDD 与 CQRS 和事件溯源结合使用。 我需要在 IEventStore 的自定义实现中使用事件存储(特别是 this event store)来保存和检索域事件,但我在处理 serialization/deserialization 时遇到了困难。

这是我正在实现的接口:

public interface IEventStore
{
    Task<IEnumerable<IDomainEvent>> GetEventsAsync(Identity aggregateIdentity, Type aggregateType);

    Task PersistAsync(IAggregateRoot aggregateRoot, IEnumerable<IDomainEvent> domainEvents);
}

在我的 IEventStore 实现之外,我可以将每个 IDomainEvent 的映射器映射到某些 serializable/deserializable EventDto 或 json 字符串中。那不是问题。 但这些是我的限制:

下面是我的尝试:

public class MyEventStore
    : IEventStore
{
    private readonly IStreamNameFactory _streamNameFactory;
    private readonly IEventStoreConnection _eventStoreConnection; //this is the Greg Young's EventStore product that I want to use as database
    private readonly IDomainEventFactory _domainEventFactory;
    private readonly IEventDataFactory _eventDataFactory;

    public EventStore(
        IStreamNameFactory streamNameFactory, 
        IEventStoreConnection eventStoreConnection, 
        IDomainEventFactory domainEventFactory, 
        IEventDataFactory eventDataFactory)
    {
        _streamNameFactory = streamNameFactory;
        _eventStoreConnection = eventStoreConnection;
        _domainEventFactory = domainEventFactory;
        _eventDataFactory = eventDataFactory;
    }

    public async Task<IEnumerable<IDomainEvent>> GetEventsAsync(
        Identity aggregateIdentity, 
        Type aggregateType)
    {
        var aggregateIdentityValue = aggregateIdentity.Value;
        var streamName = _streamNameFactory.Create(aggregateIdentityValue, aggregateType);

        var streamEventSlice =
            await _eventStoreConnection.ReadStreamEventsForwardAsync(streamName, 0, Int32.MaxValue, false);

        var domainEvents = streamEventSlice
            .Events
            .Select(x => _domainEventFactory.Create(x));

        return domainEvents;
    }

    [SuppressMessage("ReSharper", "PossibleMultipleEnumeration")]
    public async Task PersistAsync(
        IAggregateRoot aggregateRoot, 
        IEnumerable<IDomainEvent> domainEvents)
    {
        var numberOfEvents = domainEvents.Count();
        var aggregateRootVersion = aggregateRoot.Version;
        var originalVersion = aggregateRootVersion - numberOfEvents;
        var expectedVersion = originalVersion - 1;

        var aggregateIdentityValue = aggregateRoot.AggregateIdentity.Value;
        var aggregateRootType = aggregateRoot.GetType();
        var streamName = _streamNameFactory.Create(aggregateIdentityValue, aggregateRootType);
        var assemblyQualifiedName = aggregateRootType.AssemblyQualifiedName;

        var eventsToStore = domainEvents.Select(x => _eventDataFactory.Create(x, assemblyQualifiedName));

        await _eventStoreConnection.AppendToStreamAsync(streamName, expectedVersion, eventsToStore);
    }
}

如您所想,问题主要出在 IDomainEventFactory 实现中。我需要一个实现以下接口的 class:

public interface IDomainEventFactory
{
    IDomainEvent Create(ResolvedEvent resolvedEvent);
}

这个class需要知道它需要在运行时将resolvedEvent反序列化到哪个特定的IDomainEvent。换句话说,如果正在检索的事件是 MyThingCreatedEvent 的 json 表示,也许我可以使用 IMapper<ResolvedEvent, MyThingCreatedEvent> 等服务。但是,如果要检索的事件是 MyThingUpdatedEvent 的 json 表示,那么我将需要诸如 IMapper<ResolvedEvent, MyThingUpdatedEvent> 之类的服务。

我想到了一些方法。

选项 1: 我想我可以让 IDomainEventFactory 实现使用 autofac IComponentContext,这样在运行时我可以设法做一些 _componentContext.Resolve(theNeededType)。但我不知道如何检索我需要的 IMapper。也许这是可能的,但我对此表示怀疑。

选项 2: 也许我可以有一些地图服务,例如 IBetterMapper,例如

public interface IBetterMapping
{
    TDestination Map<TDestination>(object source) where TDestination : class;
}

这样我的工厂就可以将了解如何将任何内容反序列化为 TDestination 的关注委托出去。但我会遇到同样的问题:我不知道如何在运行时从字符串创建类型,例如,做类似 _myBetterMapper.Map<WhichTypeHere> 的事情,还有实现该 Map 方法的额外问题,我猜测需要一些注册 table 并根据类型选择一个或另一个特定的映射器。

我真的被这个困住了。希望我能得到你们的帮助! :)

更新:我已经实现了自己的解决方案并在我的个人仓库中上传了项目:https://gitlab.com/iberodev/DiDrDe.EventStore.Infra.EventStore 我采用的解决方案是保持事件存储包装器不可知,但在 DI 注册时为那些有点 "special" 的事件提供自定义 serializer/deserializer。 EventStore 允许添加自定义元数据 headers,因此我使用一些自定义 headers 来指定每个数据流上的具体实现类型,以便我知道在检索持久化事件时在何处反序列化。

更新的答案:

随着时间的推移,我逐渐意识到整个方法是一种糟糕的做法。我认为域事件永远不应该具有抽象(多态)属性,这些属性可以采用不同的形状,因此在反序列化时会出现问题以确切知道事件被序列化成什么形状。

这个问题不是技术性的(尽管为此,我下面的回答仍然有效)而是哲学性的。

我坚信领域事件应该只使用基本类型。不会改变的东西(字符串、整数,可能是一些 "safe" 自定义类型,例如金钱等)。拥有多态域事件没有多大意义。 如果一个事件可以采取不同的形式,那么我们可能正在谈论不同的事件

重要的是要考虑到一个非常古老的事件(例如:一年前引发的事件)在创建投影时也必须反序列化(例如:在重播期间,或者只是在实例化期间)具有事件源的聚合),因此应该正确反序列化此事件而不会失败。想象一下,如果出于某种原因有人修改了该事件正在使用的 classes 之一,现在旧信息无法反序列化到新 class 中,那将是一团糟。我们将违反事件溯源中最基本的事情。

这就是为什么我认为我们不应该使用具有复杂 objects 的领域事件,除非我们 100% 确定这些 class 不会改变,并且我们不应该使用多态领域事件完全没有。


我已经在 EventStore .NET 客户端上实现了一个包装器,它实现了我的 IEventStore 接口并从幕后的任何事物中抽象出我的客户端应用程序。

public interface IEventStore
{
    Task<IEnumerable<IDomainEvent>> GetEventsAsync(Guid aggregateId, Type aggregateType);
    Task PersistAsync(IAggregateRoot aggregateRoot, IEnumerable<IDomainEvent> domainEvents);
}

我解决 serialization/deserialization 主要问题的方法是为 "special" 的域事件提供自定义 serializer/deserializer(因为它们具有无法实现的抽象或接口属性被反序列化,除非知道它的具体具体类型)。此外,对于每个持续存在的域事件,我保存元数据 headers 说明它是哪个特定的域事件类型以及它是哪个特定的可序列化事件类型。

换句话说,持久化时的流程是这样的: IDomainEvent -> convert to a serializable type (if needed) -> transform in bytes -> save stream data

并且在检索时 Stream Data -> transform to serializable type -> transform to IDomainEvent

我已将整个项目上传到我在 GitLab 的个人存储库中: https://gitlab.com/iberodev/DiDrDe.EventStore.Infra.EventStore ,请随时查看并 运行 所有与 xUnit 的集成和单元测试以了解它。当然,请随时提供任何反馈!

我的解决方案的繁重工作在于需要使用事件存储的客户端部分。它的基础架构层(在其主机应用程序中注册 Autofac)负责使用 Autofac 扩展注册 EventStore,并在需要时提供所需的自定义 serializers/deserializers。

这样我就可以使 EventStore 包装器的实现完全不受特定设置和特定域事件的影响。这是一个通用的解决方案。

项目的README对此进行了说明,但如果领域事件是可序列化的(无抽象属性),基本上事件存储可以这样注册:

var builder = new ContainerBuilder(); // Autofac container
builder
    .RegisterEventStore(
        ctx =>
        {
            var eventStoreOptions =
                new EventStoreOptions
                {
                    ConnectionString = "ConnectTo=tcp://admin:changeit@127.0.0.1:1113; HeartBeatTimeout=500";
                };
            return eventStoreOptions;
        });
var container = builder.Build();

如果存在特殊的域事件,因为它们具有抽象属性,则像这样:

var builder = new ContainerBuilder();
builder
    .RegisterEventStore(
        ctx =>
        {
            var eventStoreOptions =
                new EventStoreOptions
                {
                    ConnectionString = "ConnectTo=tcp://admin:changeit@127.0.0.1:1113; HeartBeatTimeout=500";
                };
            return eventStoreOptions;
        },
        ctx =>
        {
            var customDomainEventMappersOptions =
                new CustomDomainEventMappersOptions()
                    .UsesCustomMappers<FakeDomainEventNotSerializable, FakeSerializableEvent>(
                        domainEvent =>
                        {
                            var mapper =
                                new FakeDomainEventNotSerializableToFakeSerializableEventMapper();
                            var result = mapper.Map(domainEvent);
                            return result;
                        },
                        serializableEvent =>
                        {
                            var mapper =
                                new FakeSerializableEventToFakeDomainEventNotSerializableMapper();
                            var result = mapper.Map(serializableEvent);
                            return result;
                        });
            return customDomainEventMappersOptions;
        });

var container = builder.Build();