运行 直接来自 Axon 聚合的命令处理程序的查询是否适用于 CQRS?
Is running a query directly from an Axon aggregate's command handler OK with CQRS?
这更多是与 CQRS 和 Axon 相关的理论问题。下面是一个不言自明的设置,代码是伪代码,没有编译的意思。
假设为了处理来自聚合“Foo”的命令,我们需要首先查询另一个聚合“Bar”的状态以进行验证(来自另一个有界上下文,所以这不是简单查找的问题“Foo”的成员聚合)。
如伪代码所示,我们在这里有两个选择。选择 (1),我们只是 运行 使用查询网关从“Foo”聚合中的命令处理程序查询。
选择 (2),我们应用一个事件来请求专用服务来处理查询,而不是“Foo”。然后,该服务在向系统查询“Bar”的状态后,会将命令发送回“Foo”。
第一个选择(直接从命令处理程序查询)似乎违背了命令-查询分离的整个想法——毕竟,这样我们是在命令处理期间执行查询,对吗?
第二个选择似乎更符合 CQRS 的精神:该命令只导致一个事件(稍后将导致另一个命令等)。但显然,这是有代价的:涉及的步骤更多,2a, 2b, 2c, 2d...
我想听听社区对此有何看法。对我来说,如果我们严格禁止将命令与查询处理混合但允许执行查询与命令处理,这似乎很奇怪。还是我遗漏了什么?
@Aggregate
class AggregateFoo {
private QueryGateway queryGateway;
@CommandHandler
public void on(UpdateFooCommand command){
/*
Assume that in order to validate this command we first need
to query the state of another aggregate, "Bar".
*/
// 1. We can just issue the query directly from the command handler.
queryGateway
.query(new AskForStateOfBarQuery(command.getBarId()))
.then(queryResponse -> {
// Proceed with original command execution depending
// the result of the query response.
});
// 2a. Or we can issue an intermediate EVENT offloading the query handling
// to a dedicated service ("FooBarService", see below).
AggregateLifecycle.apply(new FooUpdateValidationRequestedEvent(command.getBarId()));
}
// 2d. "Foo" aggregate will react to the validation command from the
// dedicated service effectively executing the original command.
@CommandHandler
public void on(ProceedWithFooUpdateCommand command){
// Do other validations, issue events here. At this
// point we know that UpdateFooCommand was validated.
}
}
@Service
class FooBarService {
private QueryGateway queryGateway;
private CommandGateway commandGateway;
@EventHandler
public void on(FooUpdateValidationRequestedEvent event){
// 2b. The dedicated service will run the corresponding query,
// asking for the state of "Bar".
queryGateway
.query(new AskForStateOfBarQuery(command.getBarId()))
.then(queryResponse -> {
// 2c. And will issue a COMMAND to the "Foo" aggregate
// indicating that it shoud proceed with the original
// command's (UpdateFooCommand) execution.
commandGateway.send(new ProceedWithFooUpdateCommand(command.getFooId()));
});
}
}
更新:
这是对 91stefan 给出的信息性答案进行讨论后的更新(见下文)。
class AggregateFoo {
int f = 9;
// reference to the related Bar aggregate
UUID bar;
on(UpdateFooCommand){
// Assume we must execute ONLY IF f < 10 AND bar.b > 10.
// So we apply event to Saga (with f = 9),
// Saga will ask Bar: (b = 15), so the condition holds
// and Saga issues ConfirmValidBarStateCommand
}
// Meanwhile, when Saga is busy validating, we process another
// command CHANGING the state of Foo
on(AnotherCommand) { f++; }
// or "ConfirmValidBarStateCommand" as in 91stefan's example
on(ProceedWithFooUpdateCommand){
// So when we get back here (from Saga), there is no guarantee
// that the actual state of Foo (f < 10) still holds,
// and that we can proceed with the execution of the
// original UpdateFooCommand
}
}
class AggregateBar {
int b = 15;
}
所以看起来问题仍然存在:如果 Foo 中的命令的验证取决于来自另一个有界上下文的 Bar 的状态,如何在 Foo 中一致地验证和执行命令?看来我们这里可能有几个选择:
- 从 Foo 的命令处理程序直接查询 Bar 的预计状态
- 向 Bar 查询投影状态的(无状态)服务的事件
- (91stefan 的建议)事件到 Saga,它向 Bar 发送验证命令
您可能会遇到问题,从命令处理程序查询不是一个好的做法,因为由于最终一致性,您的投影可能不是 up-to-date。此外,来自同一聚合的命令处理程序被执行 sequentially/synchronously。您只想做一些快速的事情,调用外部服务将阻止并阻止其他命令聚合执行,直到当前命令处理程序完成执行。
在这种情况下你需要的是 Saga。
https://docs.axoniq.io/reference-guide/v/3.3/part-ii-domain-logic/sagas
简化流程为:
- commandHandler(UpdateFooCommand) -> applyEvent(AskedForStateOfBarEvent)
- AskedForStateOfBarEvent 启动 Saga
- 在 saga 中,向 Bar aggregate 发送命令 -> ConfirmYourStateBar(barAggregateId)
- commandHandler(ConfirmYourStateBar) -> 验证状态 -> applyEvent(StateIsValidatedEvent)
- Saga 对 StateIsValidatedEvent 作出反应,向 AggregateFoo 发送命令 -> (ConfirmValidBarStateCommand(aggreageFooId) & saga 已关闭(结束事件为 StateIsValidatedEvent)
- 在 commandHandler(ConfirmValidBarStateCommand) 中,您继续执行,因为您的状态 BAR 状态有效
这更多是与 CQRS 和 Axon 相关的理论问题。下面是一个不言自明的设置,代码是伪代码,没有编译的意思。
假设为了处理来自聚合“Foo”的命令,我们需要首先查询另一个聚合“Bar”的状态以进行验证(来自另一个有界上下文,所以这不是简单查找的问题“Foo”的成员聚合)。
如伪代码所示,我们在这里有两个选择。选择 (1),我们只是 运行 使用查询网关从“Foo”聚合中的命令处理程序查询。
选择 (2),我们应用一个事件来请求专用服务来处理查询,而不是“Foo”。然后,该服务在向系统查询“Bar”的状态后,会将命令发送回“Foo”。
第一个选择(直接从命令处理程序查询)似乎违背了命令-查询分离的整个想法——毕竟,这样我们是在命令处理期间执行查询,对吗?
第二个选择似乎更符合 CQRS 的精神:该命令只导致一个事件(稍后将导致另一个命令等)。但显然,这是有代价的:涉及的步骤更多,2a, 2b, 2c, 2d...
我想听听社区对此有何看法。对我来说,如果我们严格禁止将命令与查询处理混合但允许执行查询与命令处理,这似乎很奇怪。还是我遗漏了什么?
@Aggregate
class AggregateFoo {
private QueryGateway queryGateway;
@CommandHandler
public void on(UpdateFooCommand command){
/*
Assume that in order to validate this command we first need
to query the state of another aggregate, "Bar".
*/
// 1. We can just issue the query directly from the command handler.
queryGateway
.query(new AskForStateOfBarQuery(command.getBarId()))
.then(queryResponse -> {
// Proceed with original command execution depending
// the result of the query response.
});
// 2a. Or we can issue an intermediate EVENT offloading the query handling
// to a dedicated service ("FooBarService", see below).
AggregateLifecycle.apply(new FooUpdateValidationRequestedEvent(command.getBarId()));
}
// 2d. "Foo" aggregate will react to the validation command from the
// dedicated service effectively executing the original command.
@CommandHandler
public void on(ProceedWithFooUpdateCommand command){
// Do other validations, issue events here. At this
// point we know that UpdateFooCommand was validated.
}
}
@Service
class FooBarService {
private QueryGateway queryGateway;
private CommandGateway commandGateway;
@EventHandler
public void on(FooUpdateValidationRequestedEvent event){
// 2b. The dedicated service will run the corresponding query,
// asking for the state of "Bar".
queryGateway
.query(new AskForStateOfBarQuery(command.getBarId()))
.then(queryResponse -> {
// 2c. And will issue a COMMAND to the "Foo" aggregate
// indicating that it shoud proceed with the original
// command's (UpdateFooCommand) execution.
commandGateway.send(new ProceedWithFooUpdateCommand(command.getFooId()));
});
}
}
更新:
这是对 91stefan 给出的信息性答案进行讨论后的更新(见下文)。
class AggregateFoo {
int f = 9;
// reference to the related Bar aggregate
UUID bar;
on(UpdateFooCommand){
// Assume we must execute ONLY IF f < 10 AND bar.b > 10.
// So we apply event to Saga (with f = 9),
// Saga will ask Bar: (b = 15), so the condition holds
// and Saga issues ConfirmValidBarStateCommand
}
// Meanwhile, when Saga is busy validating, we process another
// command CHANGING the state of Foo
on(AnotherCommand) { f++; }
// or "ConfirmValidBarStateCommand" as in 91stefan's example
on(ProceedWithFooUpdateCommand){
// So when we get back here (from Saga), there is no guarantee
// that the actual state of Foo (f < 10) still holds,
// and that we can proceed with the execution of the
// original UpdateFooCommand
}
}
class AggregateBar {
int b = 15;
}
所以看起来问题仍然存在:如果 Foo 中的命令的验证取决于来自另一个有界上下文的 Bar 的状态,如何在 Foo 中一致地验证和执行命令?看来我们这里可能有几个选择:
- 从 Foo 的命令处理程序直接查询 Bar 的预计状态
- 向 Bar 查询投影状态的(无状态)服务的事件
- (91stefan 的建议)事件到 Saga,它向 Bar 发送验证命令
您可能会遇到问题,从命令处理程序查询不是一个好的做法,因为由于最终一致性,您的投影可能不是 up-to-date。此外,来自同一聚合的命令处理程序被执行 sequentially/synchronously。您只想做一些快速的事情,调用外部服务将阻止并阻止其他命令聚合执行,直到当前命令处理程序完成执行。
在这种情况下你需要的是 Saga。 https://docs.axoniq.io/reference-guide/v/3.3/part-ii-domain-logic/sagas
简化流程为:
- commandHandler(UpdateFooCommand) -> applyEvent(AskedForStateOfBarEvent)
- AskedForStateOfBarEvent 启动 Saga
- 在 saga 中,向 Bar aggregate 发送命令 -> ConfirmYourStateBar(barAggregateId)
- commandHandler(ConfirmYourStateBar) -> 验证状态 -> applyEvent(StateIsValidatedEvent)
- Saga 对 StateIsValidatedEvent 作出反应,向 AggregateFoo 发送命令 -> (ConfirmValidBarStateCommand(aggreageFooId) & saga 已关闭(结束事件为 StateIsValidatedEvent)
- 在 commandHandler(ConfirmValidBarStateCommand) 中,您继续执行,因为您的状态 BAR 状态有效