一个事件源实体如何订阅另一个实体的状态变化?
How can an event sourced entity to subscribe to state changes in another entity?
我有一个事件源实体 (C),它需要更改其状态以响应不同类型的另一个实体 (P) 中的状态更改。 C 的状态是否应该实际改变的逻辑非常复杂,并且要计算的数据存在于 C 中;此外,C 的许多实例应该监听 P 的一个实例,并且实例集会随着时间的推移而增加,所以我宁愿他们从知道 P 的 ID 的流中退出,也不愿让 P 跟踪所有的 ID Cs 并推送给他们。
我正在考虑做这样的事情:
- 标记 P 事件的投影
- 有一个 Subscribe(P.id) 命令被发送到 C
- 如果 C 尚未订阅 P(它只能订阅一个,并且不应更改),则触发一个事件 Subscribed(P.id)
- 为了响应事件,使用 Akka-persistent-query 实现 1 中标记的事件流,将它们映射到命令,然后 运行 异步地通过同步将它们发送到我的 ES 实体引用
这似乎有点像在事件处理程序中包含流 运行 的反模式。我想知道是否有 better/more 支持的方式来做到这一点,而上游不必知道下游。我决定不使用 Akka pub-sub,因为它最多只能发送一次,而且我想尽可能避免使用 Kafka。
您肯定不想 运行 事件处理程序中的流:事件处理程序应该 永远不会 副作用。
假设您希望 C
从 C
不是 运行ning 的时间获取事件(包括在此之前 C
曾经 运行),这表明每个 C
的流应该是 运行。由于订阅将针对一个特定的 P
,我会认真考虑不标记,而是使用 eventsByPersistenceId
流来获取 P
的所有事件并忽略那些'感兴趣。在流中,您将这些转换为 C
的 API 中的命令,包括 P
事件流中带有命令的偏移量,并将其发送到 C
(对于至少一次交付,带有询问的 mapAsync
很有用;C
将保留一个事件记录,它处理了偏移量:这允许命令是幂等的,因为 C
如果偏移量小于或等于其状态下的高水位偏移量,则可以确认该命令。
此流在成功持久化 Subscribed(P.id)
事件后由命令处理程序启动(在本例中从偏移量 0 开始),然后在持久化 actor 重新水合后启动(如果状态显示它是)订阅(在这种情况下从一加高水位偏移量开始)。
此处不使用标签的理由是假设 C
不感兴趣的事件数量小于带有 P
标签的事件数量 P
=10=] 未订阅(请注意,对于大多数持久性插件,标签越多,开销就越大:仅由实体的一个特定实例使用的标签通常不是一个好主意).如果有问题的标签很少见,这个假设可能不成立,eventsByTag
和按 id 过滤可能很有用。
这当然有一个缺点,即每个 C
的 运行 宁离散流:取决于有多少 C
订阅了给定的 P
,这样做的开销可能很大,而且被赶上的订阅者的流将特别浪费。在这种情况下,可以将向给定 P
的订阅 C
发送命令的责任转移给参与者。该场景中唯一真正的变化是 C
将 运行 流,它通过询问参与者从 P
提供事件来确认它已订阅事件流。因为这种方法明显增加了复杂性(特别是在 C
加入和退出共享的“捕获”流时的管理),我倾向于建议从 stream-per- C
方法,然后转到共享流(同样值得注意的是,可以有多个共享流:事实上,我倾向于让每个共享流成为 ActorSystem
(例如,“节点单例“每个 P
的兴趣),以免涉及远程处理),因为进行转换并不困难(从 C
的角度来看,改编的命令是否来自流它开始或来自其他演员 运行 的流)。
我有一个事件源实体 (C),它需要更改其状态以响应不同类型的另一个实体 (P) 中的状态更改。 C 的状态是否应该实际改变的逻辑非常复杂,并且要计算的数据存在于 C 中;此外,C 的许多实例应该监听 P 的一个实例,并且实例集会随着时间的推移而增加,所以我宁愿他们从知道 P 的 ID 的流中退出,也不愿让 P 跟踪所有的 ID Cs 并推送给他们。
我正在考虑做这样的事情:
- 标记 P 事件的投影
- 有一个 Subscribe(P.id) 命令被发送到 C
- 如果 C 尚未订阅 P(它只能订阅一个,并且不应更改),则触发一个事件 Subscribed(P.id)
- 为了响应事件,使用 Akka-persistent-query 实现 1 中标记的事件流,将它们映射到命令,然后 运行 异步地通过同步将它们发送到我的 ES 实体引用
这似乎有点像在事件处理程序中包含流 运行 的反模式。我想知道是否有 better/more 支持的方式来做到这一点,而上游不必知道下游。我决定不使用 Akka pub-sub,因为它最多只能发送一次,而且我想尽可能避免使用 Kafka。
您肯定不想 运行 事件处理程序中的流:事件处理程序应该 永远不会 副作用。
假设您希望 C
从 C
不是 运行ning 的时间获取事件(包括在此之前 C
曾经 运行),这表明每个 C
的流应该是 运行。由于订阅将针对一个特定的 P
,我会认真考虑不标记,而是使用 eventsByPersistenceId
流来获取 P
的所有事件并忽略那些'感兴趣。在流中,您将这些转换为 C
的 API 中的命令,包括 P
事件流中带有命令的偏移量,并将其发送到 C
(对于至少一次交付,带有询问的 mapAsync
很有用;C
将保留一个事件记录,它处理了偏移量:这允许命令是幂等的,因为 C
如果偏移量小于或等于其状态下的高水位偏移量,则可以确认该命令。
此流在成功持久化 Subscribed(P.id)
事件后由命令处理程序启动(在本例中从偏移量 0 开始),然后在持久化 actor 重新水合后启动(如果状态显示它是)订阅(在这种情况下从一加高水位偏移量开始)。
此处不使用标签的理由是假设 C
不感兴趣的事件数量小于带有 P
标签的事件数量 P
=10=] 未订阅(请注意,对于大多数持久性插件,标签越多,开销就越大:仅由实体的一个特定实例使用的标签通常不是一个好主意).如果有问题的标签很少见,这个假设可能不成立,eventsByTag
和按 id 过滤可能很有用。
这当然有一个缺点,即每个 C
的 运行 宁离散流:取决于有多少 C
订阅了给定的 P
,这样做的开销可能很大,而且被赶上的订阅者的流将特别浪费。在这种情况下,可以将向给定 P
的订阅 C
发送命令的责任转移给参与者。该场景中唯一真正的变化是 C
将 运行 流,它通过询问参与者从 P
提供事件来确认它已订阅事件流。因为这种方法明显增加了复杂性(特别是在 C
加入和退出共享的“捕获”流时的管理),我倾向于建议从 stream-per- C
方法,然后转到共享流(同样值得注意的是,可以有多个共享流:事实上,我倾向于让每个共享流成为 ActorSystem
(例如,“节点单例“每个 P
的兴趣),以免涉及远程处理),因为进行转换并不困难(从 C
的角度来看,改编的命令是否来自流它开始或来自其他演员 运行 的流)。