如何使用 Axon 框架获取所有聚合?

How to get all aggregates with Axon framework?

我刚开始使用 Axon 框架,遇到了一些障碍。

虽然我可以使用它们的 ID 加载单个聚合,但我不知道如何获取所有聚合的列表或所有聚合 ID 的列表。

EventSourcingRepositoryclass只有load()个方法return一个集合。

有没有办法获取所有聚合(ID),或者我是否应该在轴突之外保留所有聚合 ID 的列表?

为了简单起见,我现在只使用 InMemoryEventStorageEngine。 我正在使用 Axon 3.0.7.

首先,我想知道您为什么要从 Repository 中检索所有聚合的完整列表。 Repository 接口设置为您可以加载 Aggregate 来处理命令或创建新的 Aggregate.

问你的问题,我几乎猜你是用它来查询而不是命令处理。 然而,这不是 EventSourcingRepository 的预期用途。

我能想到的一个你想要这个的原因是,你想实现一个 API 调用来向应用程序中所有 Aggregates 的特定类型发布命令。 以那个场景为例,是的,您需要自己存储 aggregateId 引用。

但总结一下我之前的问题:为什么要通过 Repository 接口检索聚合列表?

回答更新

关于您的评论,我已将以下内容添加到我的回答中:

Axon 帮助您在设置应用程序时考虑到事件源,同时也考虑了 CQRS(命令查询责任分离)。 因此,这意味着您的应用程序的命令端和查询端被分开了。

聚合 Repository 是应用程序的命令端,您可以在其中请求执行操作。 因此,它不提供聚合列表,因为命令是对 a 聚合的意图表达。因此,它只需要 Repository 用户检索一个聚合或创建一个聚合。

您所需要的聚合列表示例是应用程序的查询端。 查询端(您的 views/entities)通常根据事件(通过事件获取)进行更新。 对于应用程序中的 任何 查询要求,您通常会引入一个单独的视图来满足您的需求。

在您的示例中,这意味着您将引入一个事件处理组件,监听您的聚合事件,它使用聚合的查询模型更新存储库。

传递给 EventSourcingRepositoryEventStore 实现了 StreamableMessageSource<M extends Message<?>>,这是一种获取聚合的方法。

虽然使用事件处理组件的框架方式可能会更好地扩展(取决于它的使用方式/上下文),但我很确定事件处理组件无论如何都是由 StreamableMessageSource<M extends Message<?>> 驱动的.所以如果我们想跳过框架直接进入,我们可以这样做:

    List<String> aggregates(StreamableMessageSource<Message<?>> eventStore) {
        return immediatelyAvailableStream(eventStore.openStream(
                eventStore.createTailToken() /* All events in the event store */
        ))
                .filter(e -> e instanceof DomainEventMessage)
                .map(e -> (DomainEventMessage) e)
                .map(DomainEventMessage::getAggregateIdentifier)
                .distinct()
                .collect(Collectors.toList());
    }

    /*
        Note that the stream returned by BlockingStream.asStream() will block / won't terminate
        as it waits for future elements.
     */
    static <M> Stream<M> immediatelyAvailableStream(final BlockingStream<M> messageStream) {
        Iterator<M> iterator = new Iterator<M>() {
            @Override
            public boolean hasNext() {
                return messageStream.hasNextAvailable();
            }

            @Override
            public M next() {
                try {
                    return messageStream.nextAvailable();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IllegalStateException("Didn't expect to be interrupted");
                }
            }
        };

        Spliterator<M> spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED);
        Stream stream = StreamSupport.stream(spliterator, false);
        return (Stream)stream.onClose(messageStream::close);
    }