Validation/invariants 在 CQRS 应用程序中执行

Validation/invariants enforcing in a CQRS app

我搜索了 SO 和其他 forums/mailing 列表来寻找关于这个主题的答案,虽然我知道答案在很大程度上取决于域本身,以及从最终一致性点来看什么是可接受的看来,我还在努力寻找一个好的解决方案。

这个问题与在何处验证适用于该域的业务规则有关。

我的域是一个在线市场。成员(具有卖家角色)可以 post 广告来销售商品。卖家可以指定单个订单中可以购买的最小和最大商品数量,以及商品的价格。

买家可以通过特定广告购买商品。必须遵守以下规则:

My Market BC 是处理广告和购买交易的平台。我是这样设计的:

我正在努力解决如何以及在何处验证上述业务规则,在本例中,这些规则跨越多个聚合。我最好有一个方法:

$buyer->buy($adId, $quantity);

这将由 BuyItems 命令调用

$buyCommand = new BuyItems($adId, $qty);

在会员聚合上。

在我收集到的选项中:

  1. 在域外的外层验证 - 这意味着我会在将命令发送到域之前验证命令。这意味着某些逻辑会泄漏到域外,但我会从读取模型中获取广告,验证约束(在最小值和最大值、广告活动、用户活动之间),然后发送命令。 在这种情况下,我还会以流程管理器的形式进行域端验证,该流程管理器会发出补偿操作,或者至少在出现不一致时发出警告。

  2. 在域中定义一个服务接口,并实现一个从读取模型中获取数据的服务,然后通过调用该服务在命令处理程序中进行验证。如果数据无效,则抛出异常。 域验证也必须在这里进行,因为读取模型可能不一致(再次使用流程管理器)。

  3. 在 BuyItem 处理程序中加载 Ad 和 Member 聚合根,并将其传递给 $buyer->buy($ad, $member, $qty);然后在 AR 的 buy() 方法中,检查数量是否在最小值和最大值之间。对这个选项真的不太满意,因为我提供的是我试图填充事务一致性,当我真的不需要它时(虽然我需要将命令的风险降到最低,但数量超出限制或不活跃的成员,如果发生这种情况也没什么大不了的,之后我会采取纠正措施,所以我完全同意最终的一致性。

谁能告诉我在这种情况下最好的选择是什么?

域模型是其自身当前状态的权威,而不是流程的任何其他部分。

通常有两种不同的验证。首先是消息的验证;根据您的情况,购买消息。它是否具有所有必需的数据,数据的形状是否正确,等等。此验证步骤正在隔离 中查看消息 ,这与您验证 XML 文档的方式非常相似。

假设这是一条命令消息,我们现在将其传递给域模型以进行操作。域模型拥有 模型如何响应消息而改变的业务逻辑

所以领域模型知道广告是否有效,这个特定成员是否信誉良好,项目数量是否合理。因此,它可以决定如何改变。也许它根本不会改变——隐含地丢弃消息。也许它会通过将消息添加到丢弃消息的显式列表来改变。

I'm struggling with how and where to validate the above business rules, which in this case span multiple aggregates.

有时,这表明您的聚合边界不太正确;在其他情况下,这意味着您没有正确考虑阅读。

通常,消息被路由到将要(可能)更改的聚合,并且访问处理消息所需的其他模型数据的能力被作为参数传递给聚合。

您有一个跨越多个聚合的业务流程,这是肯定的。为此,您有两个选择:

  1. 通过将多个聚合类型合并为一个来修改聚合边界。代码更简单,补偿由数据库通过回滚自动完成。可扩展性不是很好。

  2. 用一个Saga来建模整个过程。您需要为每次失败发送补偿命令。这是我将在其余答案中写的选项。

您基本上必须在单个大(全局)事务和多个较小(本地)事务之间做出选择。

Saga 应该只包含协调逻辑,它不应该自己执行业务规则。关于如何建模的提示是:当您添加有关广告购买流程的新业务规则时,不应修改 Saga。

业务规则(不变量)应由拥有验证所需数据的每个聚合体检查。例如:

规则 1:他们可以指定他们想要购买的商品数量,该数量必须介于广告允许的最小和最大之间 - 广告聚合

规则 2:他们需要活跃(因为成员可以被禁止 - 买家汇总

规则 3:广告需要处于活动状态(广告可以暂停)- 广告聚合

规则 1 和规则 3 由 Ad::buyedBy($buyerId, $quantity) 检查,规则 2 由 Buyer::buyAd($buyerId, $quantity) 检查。 Saga 只会粘合那些方法调用。它是如何做到的取决于您的低级架构和弹性要求。

假设您将使用 cqrs.nu where the Aggregates process Commands (they have methods like handleXXX(XXX $command)), like I would have done 提倡的样式,那么您的聚合和 Saga 将如下所示:

class Ad
{
    function handleBuyAd(BuyAd $command)
    {
        if (!$this->active) {
            throw new \Exception("Ad not active");
        }
        if ($command->quantity < $this->minimum || $command->quantity > $this->maximum) {
            throw new \Exception("Too litle or too many");
        }

        yield new AdWasBuyed($this->id, $command->buyerId, $command->quantity);
    }

    function handleCancelAdBuy(CancelAdBuy $command)
    {
        yield new AdBuyinWasCancelled($this->id, $command->buyerId, $command->quantity);
    }
}

class Buyer
{
    function handleBuyerBuysAd(BuyerBuysAd $command)
    {
        if ($this->banned) {
            throw new \Exception("Buyer is banned");
        }

        yield new BuyerBuyedAd($command->transactionId, $this->id, $command->buyerId, $command->quantity);
    }
}

class BuyAdSaga
{
    /** @var CommandDispather  */
    private $commandDispatcher; //injected

    function start($transactionId, $adId, $buyerId, $quantity)
    {
        $this->commandDispatcher->dispatchCommand(new BuyAd($transactionId, $adId, $buyerId, $quantity));
    }

    function processAdWasBuyed(AdWasBuyed $event) //"process" means only once
    {
        try {
            $this->commandDispatcher->dispatchCommand(new BuyerBuysAd($event->transactionId, $event->adId, $event->buyerId, $event->quantity));
        } catch (\Exception $exception) {
            // this is a compensating command
            $this->commandDispatcher->dispatchCommand(new CancelAdBuy($event->transactionId, $event->adId, $event->buyerId, $event->quantity));
        }
    }
}

这些命令包含一个 $transationId 用于标识购买广告的过程。也可以看成是一种关联Id。你可以扔掉它。

Saga 是通过start 方法启动的。您也可以转储它并考虑通过向 Ad Aggregate 发送第一个命令来启动 Saga。我这样做是为了更明确地说明这个过程是如何开始的。

如果 BuyAd 命令失败,则不需要补偿,但如果 BuyerBuysAd 命令失败,则通过向广告聚合发送命令 CancelAdBuy 来完成补偿。

请注意,此 Saga 仅通过发送命令对事件做出反应,仅此而已。它不强制执行任何业务不变量,它只是协调整个过程。

They can specify the number of items that they would like to buy, which has to be between min and max allowed by the ad.

They need to be active (as members can be banned).

The ad needs to be active (Ads can be suspended).

看起来 #1 和 #3 可以通过让 Ad 产生新的 BuyTransaction 来解决,有点类似于 here.

对于 #2,我从未见过系统通过域级别的即时一致性来强制执行用户有效性(即检查当前用户是否在与涉及 Ad 聚合根的事务相同的事务中处于活动状态) .我会将其委托给访问控制层。