如何使用 Axon 框架获取所有聚合?
How to get all aggregates with Axon framework?
我刚开始使用 Axon 框架,遇到了一些障碍。
虽然我可以使用它们的 ID 加载单个聚合,但我不知道如何获取所有聚合的列表或所有聚合 ID 的列表。
EventSourcingRepository
class只有load()
个方法return一个集合。
有没有办法获取所有聚合(ID),或者我是否应该在轴突之外保留所有聚合 ID 的列表?
为了简单起见,我现在只使用 InMemoryEventStorageEngine
。
我正在使用 Axon 3.0.7.
首先,我想知道您为什么要从 Repository
中检索所有聚合的完整列表。
Repository
接口设置为您可以加载 Aggregate
来处理命令或创建新的 Aggregate
.
问你的问题,我几乎猜你是用它来查询而不是命令处理。
然而,这不是 EventSourcingRepository
的预期用途。
我能想到的一个你想要这个的原因是,你想实现一个 API 调用来向应用程序中所有 Aggregates
的特定类型发布命令。
以那个场景为例,是的,您需要自己存储 aggregateId 引用。
但总结一下我之前的问题:为什么要通过 Repository
接口检索聚合列表?
回答更新
关于您的评论,我已将以下内容添加到我的回答中:
Axon 帮助您在设置应用程序时考虑到事件源,同时也考虑了 CQRS(命令查询责任分离)。
因此,这意味着您的应用程序的命令端和查询端被分开了。
聚合 Repository
是应用程序的命令端,您可以在其中请求执行操作。
因此,它不提供聚合列表,因为命令是对 a 聚合的意图表达。因此,它只需要 Repository
用户检索一个聚合或创建一个聚合。
您所需要的聚合列表示例是应用程序的查询端。
查询端(您的 views/entities)通常根据事件(通过事件获取)进行更新。
对于应用程序中的 任何 查询要求,您通常会引入一个单独的视图来满足您的需求。
在您的示例中,这意味着您将引入一个事件处理组件,监听您的聚合事件,它使用聚合的查询模型更新存储库。
传递给 EventSourcingRepository
的 EventStore
实现了 StreamableMessageSource<M extends Message<?>>
,这是一种获取聚合的方法。
虽然使用事件处理组件的框架方式可能会更好地扩展(取决于它的使用方式/上下文),但我很确定事件处理组件无论如何都是由 StreamableMessageSource<M extends Message<?>>
驱动的.所以如果我们想跳过框架直接进入,我们可以这样做:
List<String> aggregates(StreamableMessageSource<Message<?>> eventStore) {
return immediatelyAvailableStream(eventStore.openStream(
eventStore.createTailToken() /* All events in the event store */
))
.filter(e -> e instanceof DomainEventMessage)
.map(e -> (DomainEventMessage) e)
.map(DomainEventMessage::getAggregateIdentifier)
.distinct()
.collect(Collectors.toList());
}
/*
Note that the stream returned by BlockingStream.asStream() will block / won't terminate
as it waits for future elements.
*/
static <M> Stream<M> immediatelyAvailableStream(final BlockingStream<M> messageStream) {
Iterator<M> iterator = new Iterator<M>() {
@Override
public boolean hasNext() {
return messageStream.hasNextAvailable();
}
@Override
public M next() {
try {
return messageStream.nextAvailable();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Didn't expect to be interrupted");
}
}
};
Spliterator<M> spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED);
Stream stream = StreamSupport.stream(spliterator, false);
return (Stream)stream.onClose(messageStream::close);
}
我刚开始使用 Axon 框架,遇到了一些障碍。
虽然我可以使用它们的 ID 加载单个聚合,但我不知道如何获取所有聚合的列表或所有聚合 ID 的列表。
EventSourcingRepository
class只有load()
个方法return一个集合。
有没有办法获取所有聚合(ID),或者我是否应该在轴突之外保留所有聚合 ID 的列表?
为了简单起见,我现在只使用 InMemoryEventStorageEngine
。
我正在使用 Axon 3.0.7.
首先,我想知道您为什么要从 Repository
中检索所有聚合的完整列表。
Repository
接口设置为您可以加载 Aggregate
来处理命令或创建新的 Aggregate
.
问你的问题,我几乎猜你是用它来查询而不是命令处理。
然而,这不是 EventSourcingRepository
的预期用途。
我能想到的一个你想要这个的原因是,你想实现一个 API 调用来向应用程序中所有 Aggregates
的特定类型发布命令。
以那个场景为例,是的,您需要自己存储 aggregateId 引用。
但总结一下我之前的问题:为什么要通过 Repository
接口检索聚合列表?
回答更新
关于您的评论,我已将以下内容添加到我的回答中:
Axon 帮助您在设置应用程序时考虑到事件源,同时也考虑了 CQRS(命令查询责任分离)。 因此,这意味着您的应用程序的命令端和查询端被分开了。
聚合 Repository
是应用程序的命令端,您可以在其中请求执行操作。
因此,它不提供聚合列表,因为命令是对 a 聚合的意图表达。因此,它只需要 Repository
用户检索一个聚合或创建一个聚合。
您所需要的聚合列表示例是应用程序的查询端。 查询端(您的 views/entities)通常根据事件(通过事件获取)进行更新。 对于应用程序中的 任何 查询要求,您通常会引入一个单独的视图来满足您的需求。
在您的示例中,这意味着您将引入一个事件处理组件,监听您的聚合事件,它使用聚合的查询模型更新存储库。
传递给 EventSourcingRepository
的 EventStore
实现了 StreamableMessageSource<M extends Message<?>>
,这是一种获取聚合的方法。
虽然使用事件处理组件的框架方式可能会更好地扩展(取决于它的使用方式/上下文),但我很确定事件处理组件无论如何都是由 StreamableMessageSource<M extends Message<?>>
驱动的.所以如果我们想跳过框架直接进入,我们可以这样做:
List<String> aggregates(StreamableMessageSource<Message<?>> eventStore) {
return immediatelyAvailableStream(eventStore.openStream(
eventStore.createTailToken() /* All events in the event store */
))
.filter(e -> e instanceof DomainEventMessage)
.map(e -> (DomainEventMessage) e)
.map(DomainEventMessage::getAggregateIdentifier)
.distinct()
.collect(Collectors.toList());
}
/*
Note that the stream returned by BlockingStream.asStream() will block / won't terminate
as it waits for future elements.
*/
static <M> Stream<M> immediatelyAvailableStream(final BlockingStream<M> messageStream) {
Iterator<M> iterator = new Iterator<M>() {
@Override
public boolean hasNext() {
return messageStream.hasNextAvailable();
}
@Override
public M next() {
try {
return messageStream.nextAvailable();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Didn't expect to be interrupted");
}
}
};
Spliterator<M> spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED);
Stream stream = StreamSupport.stream(spliterator, false);
return (Stream)stream.onClose(messageStream::close);
}