编排 saga 不会 "survive" 服务失败
Orchestrating saga does not "survive" service failure
假设您有两个不同的微服务(客户和帐户)都运行作为Docker容器中的Spring启动应用程序。每次创建新客户时,也应创建相应的帐户。为了编排这个流程,我有第三个“服务”实现基于编排的 saga 逻辑。
传奇“服务”包含以下代码。
@Saga
public class CustomerAccountSaga {
private static final String ACCOUNT_CREATION_DEADLINE = "sagas.account-creation-deadline";
private static final Logger LOGGER = LogManager.getLogger(CustomerAccountSaga.class);
@Autowired
private transient CommandGateway commandGateway;
@Autowired
private transient DeadlineManager deadlineManager;
private String customerId;
private String accountDeadlineId;
@StartSaga
@SagaEventHandler(associationProperty = "aggregateId")
private void on(CustomerCreatedEvent event) {
if(LOGGER.isDebugEnabled()) {
LOGGER.debug("A new customer has been created for id = '{}'", event.getAggregateId());
}
this.customerId = event.getAggregateId().getId();
SagaLifecycle.associateWith("customerId", customerId);
//Both services has a CustomerId class defined in another package.
b.t.c.a.v.CustomerId id = new b.t.c.a.v.CustomerId(customerId);
CreateAccountCommand createAccount = new CreateAccountCommand(id);
commandGateway.send(createAccount);
this.accountDeadlineId = deadlineManager.schedule(Duration.ofDays(1), ACCOUNT_CREATION_DEADLINE);
}
@SagaEventHandler(associationProperty = "aggregateId")
private void on(CustomerDeletedEvent event) {
if(LOGGER.isDebugEnabled()) {
LOGGER.debug("A customer with id '{}' has been deleted. "
+ "The customer was deleted before the account was created, "
+ "or the request to create the account timed-out",
event.getAggregateId());
}
deadlineManager.cancelSchedule(ACCOUNT_CREATION_DEADLINE, accountDeadlineId);
SagaLifecycle.end();
}
@SagaEventHandler(associationProperty = "customerId")
private void on(AccountCreatedEvent event) {
if(LOGGER.isDebugEnabled()) {
LOGGER.debug("A corresponding account for customer with id '{}' has been created",
event.getCustomerId());
}
deadlineManager.cancelSchedule(ACCOUNT_CREATION_DEADLINE, accountDeadlineId);
SagaLifecycle.end();
}
@DeadlineHandler(deadlineName = ACCOUNT_CREATION_DEADLINE)
public void on() {
if(LOGGER.isDebugEnabled()) {
LOGGER.debug("Failed to create a new account for customer with id '{}' in a timely fashion",
customerId);
}
//Both services has a CustomerId class defined in another package.
b.t.c.c.v.CustomerId id = new b.t.c.c.v.CustomerId(customerId);
DeleteCustomerCommand deleteCustomer = new DeleteCustomerCommand(id);
commandGateway.send(deleteCustomer);
}
}
当所有服务都启动并且 运行 时,一切都按预期进行。 CustomerCreatedEvent 由 saga 处理程序处理,并按预期触发 CreateAccountCommand。后者导致创建帐户并触发 AccountCreatedEvent,这也由 saga 逻辑处理。
当我尝试以下场景时出现问题。在所有情况下,客服都是运行.
场景A
- 使用客户服务创建新客户。
- 启动帐户服务。由于帐户服务不侦听来自客户服务的任何事件,因此没有按预期发生任何事情。
- 启动传奇服务。我希望 saga 服务收到之前未处理的 CustomerCreatedEvent 以协调相应帐户的创建。
场景B
- 使用客户服务创建新客户。
- 启动传奇服务。我希望 saga 服务能够处理 CustomerCreatedEvent,但我没有收到该事件。
- 启动帐户服务。我希望帐户服务收到来自 saga 服务的 CreateAccountCommand,但它没有收到,因为第 2 步(在此流程中)没有被执行。
场景C
客户和帐户服务都已启动 运行。传奇服务已下线。
- 创建新客户。
- 启动传奇服务。我再次希望 saga 服务获取 CustomerCreatedEvent 并继续,但它没有。
由于在所有情况下都不会发生预期的行为,因此应用程序将处于不一致状态,因为不允许存在没有相应帐户的客户。
saga 服务使用以下配置为 Quartz 和 Axon 配置了一个持久存储 MySQL。 Axon 使用 Jackson 序列化程序处理事件。
# Persistence configuration (MySQL)
###################################
# Quartz persistence
spring.quartz.job-store-type=jdbc
spring.quartz.jdbc.initialize-schema=always
spring.quartz.properties.org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
spring.quartz.properties.org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
spring.quartz.properties.org.quartz.jobStore.dataSource=dsQuartz
spring.quartz.properties.org.quartz.jobStore.tablePrefix=QRTZ_
spring.quartz.properties.org.quartz.dataSource.dsQuartz.user = ******
spring.quartz.properties.org.quartz.dataSource.dsQuartz.password = *******
spring.quartz.properties.org.quartz.dataSource.dsQuartz.maxConnections = 10
spring.quartz.properties.org.quartz.dataSource.dsQuartz.driver = com.mysql.cj.jdbc.Driver
spring.quartz.properties.org.quartz.dataSource.dsQuartz.URL = jdbc:mysql://192.168.99.100:3306/saga-store
# Axon persistence
spring.datasource.url=jdbc:mysql://192.168.99.100:3306/saga-store
spring.datasource.driverClassName=com.mysql.cj.jdbc.Driver
spring.datasource.username=******
spring.datasource.password=*****
spring.jpa.database-platform=org.hibernate.dialect.MySQL8Dialect
spring.jpa.hibernate.ddl-auto=update
问题:
我是不是误解了微服务和编排 sagas 的一些基本概念,还是我在 setup/designed 包含业务逻辑和 saga 编排逻辑的不同微服务的方式上忽略了一些东西?
感谢您阅读我的post并指出我的错误。
在不太了解您的设置的情况下,我会说这完全取决于配置。
所以,基本上当你启动一个 @Saga
时,它还有一个底层的 Streaming Event Processor
,它可以从 tail
(最旧的事件)或 head
(最新的事件)开始).
Saga 流处理器的默认值是 head
,如我们的 ref-guide 所述。如果未配置任何内容,您的 Saga 只会对新事件做出反应,而不会对过去的事件做出反应 - 在从一个事件更改为另一个事件之前,您应该非常小心并仔细考虑。
另一个需要注意的重要点是关于 Saga Store
,如果没有配置,它会使用 InMemorySagaStore
。当然,这可能并不理想,您可以配置一个持久的。我们的 ref-guide 再次提供所有作品。
假设您有两个不同的微服务(客户和帐户)都运行作为Docker容器中的Spring启动应用程序。每次创建新客户时,也应创建相应的帐户。为了编排这个流程,我有第三个“服务”实现基于编排的 saga 逻辑。
传奇“服务”包含以下代码。
@Saga
public class CustomerAccountSaga {
private static final String ACCOUNT_CREATION_DEADLINE = "sagas.account-creation-deadline";
private static final Logger LOGGER = LogManager.getLogger(CustomerAccountSaga.class);
@Autowired
private transient CommandGateway commandGateway;
@Autowired
private transient DeadlineManager deadlineManager;
private String customerId;
private String accountDeadlineId;
@StartSaga
@SagaEventHandler(associationProperty = "aggregateId")
private void on(CustomerCreatedEvent event) {
if(LOGGER.isDebugEnabled()) {
LOGGER.debug("A new customer has been created for id = '{}'", event.getAggregateId());
}
this.customerId = event.getAggregateId().getId();
SagaLifecycle.associateWith("customerId", customerId);
//Both services has a CustomerId class defined in another package.
b.t.c.a.v.CustomerId id = new b.t.c.a.v.CustomerId(customerId);
CreateAccountCommand createAccount = new CreateAccountCommand(id);
commandGateway.send(createAccount);
this.accountDeadlineId = deadlineManager.schedule(Duration.ofDays(1), ACCOUNT_CREATION_DEADLINE);
}
@SagaEventHandler(associationProperty = "aggregateId")
private void on(CustomerDeletedEvent event) {
if(LOGGER.isDebugEnabled()) {
LOGGER.debug("A customer with id '{}' has been deleted. "
+ "The customer was deleted before the account was created, "
+ "or the request to create the account timed-out",
event.getAggregateId());
}
deadlineManager.cancelSchedule(ACCOUNT_CREATION_DEADLINE, accountDeadlineId);
SagaLifecycle.end();
}
@SagaEventHandler(associationProperty = "customerId")
private void on(AccountCreatedEvent event) {
if(LOGGER.isDebugEnabled()) {
LOGGER.debug("A corresponding account for customer with id '{}' has been created",
event.getCustomerId());
}
deadlineManager.cancelSchedule(ACCOUNT_CREATION_DEADLINE, accountDeadlineId);
SagaLifecycle.end();
}
@DeadlineHandler(deadlineName = ACCOUNT_CREATION_DEADLINE)
public void on() {
if(LOGGER.isDebugEnabled()) {
LOGGER.debug("Failed to create a new account for customer with id '{}' in a timely fashion",
customerId);
}
//Both services has a CustomerId class defined in another package.
b.t.c.c.v.CustomerId id = new b.t.c.c.v.CustomerId(customerId);
DeleteCustomerCommand deleteCustomer = new DeleteCustomerCommand(id);
commandGateway.send(deleteCustomer);
}
}
当所有服务都启动并且 运行 时,一切都按预期进行。 CustomerCreatedEvent 由 saga 处理程序处理,并按预期触发 CreateAccountCommand。后者导致创建帐户并触发 AccountCreatedEvent,这也由 saga 逻辑处理。
当我尝试以下场景时出现问题。在所有情况下,客服都是运行.
场景A
- 使用客户服务创建新客户。
- 启动帐户服务。由于帐户服务不侦听来自客户服务的任何事件,因此没有按预期发生任何事情。
- 启动传奇服务。我希望 saga 服务收到之前未处理的 CustomerCreatedEvent 以协调相应帐户的创建。
场景B
- 使用客户服务创建新客户。
- 启动传奇服务。我希望 saga 服务能够处理 CustomerCreatedEvent,但我没有收到该事件。
- 启动帐户服务。我希望帐户服务收到来自 saga 服务的 CreateAccountCommand,但它没有收到,因为第 2 步(在此流程中)没有被执行。
场景C
客户和帐户服务都已启动 运行。传奇服务已下线。
- 创建新客户。
- 启动传奇服务。我再次希望 saga 服务获取 CustomerCreatedEvent 并继续,但它没有。
由于在所有情况下都不会发生预期的行为,因此应用程序将处于不一致状态,因为不允许存在没有相应帐户的客户。
saga 服务使用以下配置为 Quartz 和 Axon 配置了一个持久存储 MySQL。 Axon 使用 Jackson 序列化程序处理事件。
# Persistence configuration (MySQL)
###################################
# Quartz persistence
spring.quartz.job-store-type=jdbc
spring.quartz.jdbc.initialize-schema=always
spring.quartz.properties.org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
spring.quartz.properties.org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
spring.quartz.properties.org.quartz.jobStore.dataSource=dsQuartz
spring.quartz.properties.org.quartz.jobStore.tablePrefix=QRTZ_
spring.quartz.properties.org.quartz.dataSource.dsQuartz.user = ******
spring.quartz.properties.org.quartz.dataSource.dsQuartz.password = *******
spring.quartz.properties.org.quartz.dataSource.dsQuartz.maxConnections = 10
spring.quartz.properties.org.quartz.dataSource.dsQuartz.driver = com.mysql.cj.jdbc.Driver
spring.quartz.properties.org.quartz.dataSource.dsQuartz.URL = jdbc:mysql://192.168.99.100:3306/saga-store
# Axon persistence
spring.datasource.url=jdbc:mysql://192.168.99.100:3306/saga-store
spring.datasource.driverClassName=com.mysql.cj.jdbc.Driver
spring.datasource.username=******
spring.datasource.password=*****
spring.jpa.database-platform=org.hibernate.dialect.MySQL8Dialect
spring.jpa.hibernate.ddl-auto=update
问题: 我是不是误解了微服务和编排 sagas 的一些基本概念,还是我在 setup/designed 包含业务逻辑和 saga 编排逻辑的不同微服务的方式上忽略了一些东西?
感谢您阅读我的post并指出我的错误。
在不太了解您的设置的情况下,我会说这完全取决于配置。
所以,基本上当你启动一个 @Saga
时,它还有一个底层的 Streaming Event Processor
,它可以从 tail
(最旧的事件)或 head
(最新的事件)开始).
Saga 流处理器的默认值是 head
,如我们的 ref-guide 所述。如果未配置任何内容,您的 Saga 只会对新事件做出反应,而不会对过去的事件做出反应 - 在从一个事件更改为另一个事件之前,您应该非常小心并仔细考虑。
另一个需要注意的重要点是关于 Saga Store
,如果没有配置,它会使用 InMemorySagaStore
。当然,这可能并不理想,您可以配置一个持久的。我们的 ref-guide 再次提供所有作品。