Akka:PersistenceQuery 如何替代 PersistentView?
Akka: How is PersistenceQuery a replacement for PersistentView?
我们在我们的集群单例代码中使用 PersistentView 来减轻来自读取事件的负载。现在 PersistentView
已被弃用,建议我们使用基于 Stream API 的 PersistentQuery
。我们有:
The consuming actor may be a plain Actor or a PersistentActor if it
needs to store its own state (e.g. fromSequenceNr offset).
The corresponding query type is EventsByPersistenceId. There are
several alternatives for connecting the Source to an actor
corresponding to a previous PersistentView actor:
我的问题是:
在PersistentView
中,事件是在接收块中处理的,我们有一个基于推送的系统。对于 PersistentQuery
,对 EventsByPersistenceId
的每次调用都像是一次拉动。我将如何模拟演员的持续 receive
行为?我应该这样做吗?这真的是应该使用 Streams 的方式吗?
我的理解是,每次调用get EventsByPersistenceId
本质上都是一个查询。因此,执行这些循环查询是否效率低下?
我也想知道为什么 PersistentView
被删除了。这仅仅是一种优化,还是 Akka 迁移到流的更广泛举措的一部分并且存在范式转变?我在尝试用 PersistenceQuery
模拟 PersistentView
行为时犯了错误吗?
我遇到了这个 repo,它似乎在幕后使用 PersistenceQuery
时提供旧的 PersistentView
功能。根据^中的考虑使用它是个好主意吗?
- 正如您提到的,
eventsByPersistenceId
会给您一个 Akka Streams Source
,所以有点不清楚您所说的 "each call" 是什么意思。您从此源定义一个流并将其具体化一次,新事件将由它发出。除其他外,您可以将它们发送给演员,用 mapAsync
和 ask
替换您的 PersistentView
。 http://doc.akka.io/docs/akka/snapshot/scala/persistence-query.html#materialize-view-using-mapasync and http://doc.akka.io/docs/akka/current/scala/stream/stream-integrations.html#mapasync-ask 上解释了这种方法
所以从你的演员的角度来看,它仍然是 "push-based" 并由 receive
处理。
请注意,mapAsync
在第一个参数列表中采用并行度因子。要按事件发生的顺序处理事件,您应该将其设置为“1”(即无并行性)。如果你将它设置为更高的值,比如 n,流将接收 n 事件并将消息并行发送给 actor,这意味着他们最终会以随机顺序进入邮箱。
- 再说一次,"each call" 是什么意思?它很可能只是一个 - 你 run/materialize 启动时的流,它将无限期地流式传输事件。底层日志插件实现很可能会使用轮询,这是正确的。 (我不知道,但我怀疑
PersistentView
也是这种情况?) 所以您肯定不想创建大量这些来源。但是,如果您对来自许多参与者的事件感兴趣,您更有可能标记这些事件,然后使用 eventsByTag
获取具有给定标签的所有事件的来源。
- 当时对此进行了一些讨论。用我自己的话来说,我想说的是,为单个演员提供 view/read 方面的用例并不常见。要基于 Akka Persistence 构建 CQRS 系统,需要一种更强大的方式来消费任何事件集并以任意方式处理它们,这使得流式查询成为更好的选择。用 Akka 团队的话说,设计决策在 Akka Persistence on the Query Side: The Conclusion.
中进行了解释
- 我不知道图书馆,即使我知道,如果不知道你的用例,也就是你在接收演员中实际做了什么,就很难说。就个人而言,我对
PersistenceQuery
感到满意,并且认为没有必要模仿 PersistentView
,尤其是因为将事件从流发送到 actor 相当容易,如 1.[=33 中所述=]
这个 repository 演示了 PersistenceQuery
的用法,并提供了一个非常轻量级的 Akka 已弃用的 PersistentView
与 Stream API.[=13= 的重新实现]
我们在我们的集群单例代码中使用 PersistentView 来减轻来自读取事件的负载。现在 PersistentView
已被弃用,建议我们使用基于 Stream API 的 PersistentQuery
。我们有:
The consuming actor may be a plain Actor or a PersistentActor if it needs to store its own state (e.g. fromSequenceNr offset). The corresponding query type is EventsByPersistenceId. There are several alternatives for connecting the Source to an actor corresponding to a previous PersistentView actor:
我的问题是:
在
PersistentView
中,事件是在接收块中处理的,我们有一个基于推送的系统。对于PersistentQuery
,对EventsByPersistenceId
的每次调用都像是一次拉动。我将如何模拟演员的持续receive
行为?我应该这样做吗?这真的是应该使用 Streams 的方式吗?我的理解是,每次调用get
EventsByPersistenceId
本质上都是一个查询。因此,执行这些循环查询是否效率低下?我也想知道为什么
PersistentView
被删除了。这仅仅是一种优化,还是 Akka 迁移到流的更广泛举措的一部分并且存在范式转变?我在尝试用PersistenceQuery
模拟PersistentView
行为时犯了错误吗?我遇到了这个 repo,它似乎在幕后使用
PersistenceQuery
时提供旧的PersistentView
功能。根据^中的考虑使用它是个好主意吗?
- 正如您提到的,
eventsByPersistenceId
会给您一个 Akka StreamsSource
,所以有点不清楚您所说的 "each call" 是什么意思。您从此源定义一个流并将其具体化一次,新事件将由它发出。除其他外,您可以将它们发送给演员,用mapAsync
和ask
替换您的PersistentView
。 http://doc.akka.io/docs/akka/snapshot/scala/persistence-query.html#materialize-view-using-mapasync and http://doc.akka.io/docs/akka/current/scala/stream/stream-integrations.html#mapasync-ask 上解释了这种方法 所以从你的演员的角度来看,它仍然是 "push-based" 并由receive
处理。 请注意,mapAsync
在第一个参数列表中采用并行度因子。要按事件发生的顺序处理事件,您应该将其设置为“1”(即无并行性)。如果你将它设置为更高的值,比如 n,流将接收 n 事件并将消息并行发送给 actor,这意味着他们最终会以随机顺序进入邮箱。 - 再说一次,"each call" 是什么意思?它很可能只是一个 - 你 run/materialize 启动时的流,它将无限期地流式传输事件。底层日志插件实现很可能会使用轮询,这是正确的。 (我不知道,但我怀疑
PersistentView
也是这种情况?) 所以您肯定不想创建大量这些来源。但是,如果您对来自许多参与者的事件感兴趣,您更有可能标记这些事件,然后使用eventsByTag
获取具有给定标签的所有事件的来源。 - 当时对此进行了一些讨论。用我自己的话来说,我想说的是,为单个演员提供 view/read 方面的用例并不常见。要基于 Akka Persistence 构建 CQRS 系统,需要一种更强大的方式来消费任何事件集并以任意方式处理它们,这使得流式查询成为更好的选择。用 Akka 团队的话说,设计决策在 Akka Persistence on the Query Side: The Conclusion. 中进行了解释
- 我不知道图书馆,即使我知道,如果不知道你的用例,也就是你在接收演员中实际做了什么,就很难说。就个人而言,我对
PersistenceQuery
感到满意,并且认为没有必要模仿PersistentView
,尤其是因为将事件从流发送到 actor 相当容易,如 1.[=33 中所述=]
这个 repository 演示了 PersistenceQuery
的用法,并提供了一个非常轻量级的 Akka 已弃用的 PersistentView
与 Stream API.[=13= 的重新实现]