F# 中的事件溯源:如果必须在事件生成之间更新聚合状态怎么办?
Event Sourcing in F#: what if aggregate state must be updated between events production?
我刚开始研究 F# 语言,最近阅读了 article 关于函数式事件溯源的文章,有一些问题需要澄清。提供了以下命令处理程序:
let execute state command =
let event =
match command with
| AddTask args ->
args.Id
|> onlyIfTaskDoesNotAlreadyExist state
|> (fun _ -> TaskAdded args)
| RemoveTask args ->
args.Id
|> onlyIfTaskExists state
|> (fun _ -> TaskRemoved args)
| ClearAllTasks -> AllTasksCleared
| CompleteTask args ->
args.Id
|> onlyIfTaskExists state
|> (fun _ -> TaskCompleted args)
| ChangeTaskDueDate args ->
args.Id
|> (onlyIfTaskExists state >> onlyIfNotAlreadyFinished)
|> (fun _ -> TaskDueDateChanged args)
event |> List.singleton //we must return list of events
一个命令产生一个事件,这很简单。但是如何处理在产生一些新事件之前必须更新聚合状态的情况?
考虑以下示例:您有一个 Order 聚合和一个 OrderLine 实体。用户可以添加或删除订单行,每次他这样做时都必须计算该订单的总金额。可以通过三个函数实现:
let addOrderLine order catalogItem = if (isOpen order) then Ok OrderLineAdded(catalogItem .Id, catalogItem.Price) else Error OrderAlreadyClosed
let removeOrderLine order lineId = if (isOpen order) then Ok OrderLineRemoved(lineId) else Error OrderAlreadyClosed
let calculateOrder order =
let sum = order.orderLines |> List.sum (fun x -> x.Price)
OrderCalculated sum
所以 caculateOrder 函数应该只在实际聚合状态下调用。这意味着我需要先添加订单行,使用生成的 OrderLineAdded 事件更新状态,最后执行计算。毕竟我将从处理程序中收集所有产生的事件和 return 它们。以 OOP 风格实现这种情况非常容易,但在 FP 中,处理程序的大小增长得非常快。这对我来说似乎是设计的味道。
我是否应该重新考虑我的设计并将 OrderLineAdded 和 OrderCalculated 事件合二为一?或者这里可以隐藏一些优雅的解决方案?
欢迎提出任何建议
编辑
我将添加一个 C# 实现,这不会让我产生疑问。 OOP 聚合是一个可变对象,你可以很容易地更新它的状态,这不是函数式编程方式
public sealed class Order: EventSourcedAggregate
{
public int Id { get; private set; }
public bool isOpen { get; private set; }
public decimal TotalSum { get; private set; }
public List<OrderLine> OrderLines { get; }
public void Apply(OrderLineAdded e) { OrderLines.Add(new OrderLine(e.Id, e.Price));}
public void Apply(OrderCalculated e) { TotalSum = e.Sum;}
public void AddOrderLine(CatalogItem item)
{
if (!isOpen) throw new InvalidOperationException("Order closed exception");
Emit(new OrderLineAdded(item.Id, item.Price)); //Emit method saves event as uncommitted and updates aggregate state
CalculateOrder(); //calculates total sum
}
private void CalculateOrder()
{
var sum = OrderLines.Sum(x => x.Price);
Emit(new OrderCalculated(sum));
}
}
免责声明:我无论如何都不是 F# 开发人员,所以我会在 Scala 中回答这个问题。
请记住,您可以使用 event-handling 函数(因为您的集合是由事件处理程序和命令处理程序对定义的):
val eventHandler: Event => Aggregate => Aggregate = ??? // ??? meaning "not implemented yet, but we know the type it'll be
这确实意味着给定处理程序的每个事件都同构于从 Aggregate
到 Aggregate
的函数,这是事件的最有趣的 FP 解释 (IMO)。
所以在命令处理程序中,如果出于某种原因想要在应用事件后聚合的状态,则只需要自己应用事件处理程序
val commandHandler: Aggregate => Command => (Response, Seq[Event]) = state => cmd => {
cmd match {
case AddOrderLine(catalogId, price) =>
if (!isOpen(state)) Error(OrderAlreadyClosed) -> Seq.empty
else {
val added = OrderLineAdded(catalogId, price)
val intermediateState = eventHandler(added)(state)
// explicit iterator to not create an intermediate strict collection
val sum = intermediateState.orderLines.iterator.map(_.price).sum
Ok -> Seq(added, OrderCalculated(sum))
}
}
}
当然,OrderLineAdded
的事件处理程序没有理由不能更新总和。事件溯源的部分好处是您的事件可以指定对聚合的任意多个属性的更改(具有命令和相应事件的模型,用于更改聚合的每个 属性,其中没有事件更改超过一个属性 可能是贫血域模型的典型示例)。关于我认为在命令处理程序中应用事件处理程序有意义的唯一原因是确定给定命令无效的情况将有效地复制事件处理程序的逻辑并使结果状态无法表示会产生不利影响在整体代码清晰度上,或者在您的语言类型系统中不容易表达:有点像“f around,find out,然后忘记它曾经发生过”的方法。一个例子可能是某个域约束,即有效订单的总成本绝不能是偶数和大于 4 的完全平方的乘积(因此禁止使用 $18、$32、$36、$50、$54、$64 等).
我刚开始研究 F# 语言,最近阅读了 article 关于函数式事件溯源的文章,有一些问题需要澄清。提供了以下命令处理程序:
let execute state command =
let event =
match command with
| AddTask args ->
args.Id
|> onlyIfTaskDoesNotAlreadyExist state
|> (fun _ -> TaskAdded args)
| RemoveTask args ->
args.Id
|> onlyIfTaskExists state
|> (fun _ -> TaskRemoved args)
| ClearAllTasks -> AllTasksCleared
| CompleteTask args ->
args.Id
|> onlyIfTaskExists state
|> (fun _ -> TaskCompleted args)
| ChangeTaskDueDate args ->
args.Id
|> (onlyIfTaskExists state >> onlyIfNotAlreadyFinished)
|> (fun _ -> TaskDueDateChanged args)
event |> List.singleton //we must return list of events
一个命令产生一个事件,这很简单。但是如何处理在产生一些新事件之前必须更新聚合状态的情况?
考虑以下示例:您有一个 Order 聚合和一个 OrderLine 实体。用户可以添加或删除订单行,每次他这样做时都必须计算该订单的总金额。可以通过三个函数实现:
let addOrderLine order catalogItem = if (isOpen order) then Ok OrderLineAdded(catalogItem .Id, catalogItem.Price) else Error OrderAlreadyClosed
let removeOrderLine order lineId = if (isOpen order) then Ok OrderLineRemoved(lineId) else Error OrderAlreadyClosed
let calculateOrder order =
let sum = order.orderLines |> List.sum (fun x -> x.Price)
OrderCalculated sum
所以 caculateOrder 函数应该只在实际聚合状态下调用。这意味着我需要先添加订单行,使用生成的 OrderLineAdded 事件更新状态,最后执行计算。毕竟我将从处理程序中收集所有产生的事件和 return 它们。以 OOP 风格实现这种情况非常容易,但在 FP 中,处理程序的大小增长得非常快。这对我来说似乎是设计的味道。
我是否应该重新考虑我的设计并将 OrderLineAdded 和 OrderCalculated 事件合二为一?或者这里可以隐藏一些优雅的解决方案?
欢迎提出任何建议
编辑
我将添加一个 C# 实现,这不会让我产生疑问。 OOP 聚合是一个可变对象,你可以很容易地更新它的状态,这不是函数式编程方式
public sealed class Order: EventSourcedAggregate
{
public int Id { get; private set; }
public bool isOpen { get; private set; }
public decimal TotalSum { get; private set; }
public List<OrderLine> OrderLines { get; }
public void Apply(OrderLineAdded e) { OrderLines.Add(new OrderLine(e.Id, e.Price));}
public void Apply(OrderCalculated e) { TotalSum = e.Sum;}
public void AddOrderLine(CatalogItem item)
{
if (!isOpen) throw new InvalidOperationException("Order closed exception");
Emit(new OrderLineAdded(item.Id, item.Price)); //Emit method saves event as uncommitted and updates aggregate state
CalculateOrder(); //calculates total sum
}
private void CalculateOrder()
{
var sum = OrderLines.Sum(x => x.Price);
Emit(new OrderCalculated(sum));
}
}
免责声明:我无论如何都不是 F# 开发人员,所以我会在 Scala 中回答这个问题。
请记住,您可以使用 event-handling 函数(因为您的集合是由事件处理程序和命令处理程序对定义的):
val eventHandler: Event => Aggregate => Aggregate = ??? // ??? meaning "not implemented yet, but we know the type it'll be
这确实意味着给定处理程序的每个事件都同构于从 Aggregate
到 Aggregate
的函数,这是事件的最有趣的 FP 解释 (IMO)。
所以在命令处理程序中,如果出于某种原因想要在应用事件后聚合的状态,则只需要自己应用事件处理程序
val commandHandler: Aggregate => Command => (Response, Seq[Event]) = state => cmd => {
cmd match {
case AddOrderLine(catalogId, price) =>
if (!isOpen(state)) Error(OrderAlreadyClosed) -> Seq.empty
else {
val added = OrderLineAdded(catalogId, price)
val intermediateState = eventHandler(added)(state)
// explicit iterator to not create an intermediate strict collection
val sum = intermediateState.orderLines.iterator.map(_.price).sum
Ok -> Seq(added, OrderCalculated(sum))
}
}
}
当然,OrderLineAdded
的事件处理程序没有理由不能更新总和。事件溯源的部分好处是您的事件可以指定对聚合的任意多个属性的更改(具有命令和相应事件的模型,用于更改聚合的每个 属性,其中没有事件更改超过一个属性 可能是贫血域模型的典型示例)。关于我认为在命令处理程序中应用事件处理程序有意义的唯一原因是确定给定命令无效的情况将有效地复制事件处理程序的逻辑并使结果状态无法表示会产生不利影响在整体代码清晰度上,或者在您的语言类型系统中不容易表达:有点像“f around,find out,然后忘记它曾经发生过”的方法。一个例子可能是某个域约束,即有效订单的总成本绝不能是偶数和大于 4 的完全平方的乘积(因此禁止使用 $18、$32、$36、$50、$54、$64 等).