事务提交发生得太晚(在事件被另一个系统处理之后)
Commit of transaction happening too late (after the event is processed by another system)
我们有一个销售代金券的系统,这个销售过程必须与另一个系统集成。这种集成通过 AWS SQS 队列进行。
系统 A 处理订单,然后在处理结束时将消息发布到名为 new-orders-queue
的 SQS 队列。
系统 B 从 new-orders-queue
中读取数据,进行某种处理,然后将另一个事件发布到另一个名为 [=13 的 SQS 队列=].
系统 A 从 another-sqs-queue
读取数据,然后更新在步骤 1
中创建的订单
订购流程(上面的第 1 步)很长,但没有什么特别复杂的。它在其数据库 (MySQL) 中进行一些验证,然后将一些插入内容写入某些表。
所有这些都发生在 Spring 的 @Transactional
上下文中。
问题是第 3 步有时会发生 在 来自第 1 步的订单最终提交到数据库之前,这会导致错误(必须更新的订单尚未在数据库中找到,因为它尚未提交)。如果我们稍后重试,该过程将正常进行。这种情况并非总是发生,但我们必须解决这个问题。
你看过了吗?
下面是第 1 步的简化(真实)伪代码:
@Transactional
public Result handleNewOrder(OrderData data) {
SqsClient sqsClient = new SqsClient();
validatePrices(data);
doSomeInserts(data);
Result result = createResult(data);
// the last line of the method, just before the return statement, is the line that post the event to the queue
sqsClient.sendEvent(Events.create(result));
return result;
}
在这个用 @Transactional
注释的方法结束时,应该提交一些东西,但是在提交发生之前不知何故第 3 步已经完成(至少看起来是这样)。
也许将事件发布移出事务边界是解决方案(实际上,我赞成它),因为这样我们可以保证只有在事务提交后才会处理事件到数据库。但是,如果我们与 SQS 的通信出现故障,我们将不得不使用某种重试机制。
这是要走的路还是您有更好的解决方案?
这听起来像是一个需要多个事务的操作。
例如,您可能有两个方法,每个方法都用@Transactional 注释:
@Transactional
public void startHandleNewOrder(OrderData data) {
// make changes to the database here and publish event to new-orders-queue
}
@Transactional
public Result finishHandleNewOrder(OrderData data) {
// await response from another-sqs-queue and compile result
}
这应该工作假设:
- 一个单独的服务 NOT 用 @Transactional 注释(即它在事务屏障之外)按顺序调用这些方法
或者,您可以在没有注释的情况下实现它,如下所示:
@Autowired
PlatformTransactionManager transactionManager;
@PersistenceContext
EntityManager entityManager;
public Result handleNewOrder(OrderData data) {
boolean rollback = true;
TransactionStatus status = getTransaction();
try {
// make changes to the database here and publish event to new-orders-queue
status.commit();
rollback = false;
} finally {
if (rollback)
status.rollback();
}
// this may or may not be necessary if you want to ensure you're reading
// fresh data from the database (otherwise cached from step #1 may be used)
entityManager.clear();
rollback = true;
status = getTransaction();
try {
// wait for a response and compile the result
status.commit();
rollback = false;
return result;
} finally {
if (rollback)
status.rollback();
}
}
private TransactionStatus getTransaction() {
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
return transactionManager.getTransaction(def);
}
最后,按照 M. Deinum 的建议,我用默认阶段 (TransactionPhase.AFTER_COMMIT
) 实现了 @TransactionalEventListener
。
像这样:
@TransactionalEventListener(classes = {SellVoucherEvent.class})
public void dispatch(SellVoucherEvent event) {
sqsClient.sendMessage("queue", turnEventToString(event));
}
此方法在 @Component
class 中实现,并且在我的事务上下文中,我通过 ApplicationEventPublisher
(由 Spring 注入)发布事件。
示例:
private final ApplicationEventPublisher publisher; // injected by Spring
@Transactional
public Result handleNewOrder(OrderData data) {
validatePrices(data);
doSomeInserts(data);
Result result = createResult(data);
SellVoucherEvent event = createEvent();
publisher.publishEvent(event); // publish the application event
return result;
}
然后commit后,调用@TransactionalEventListener
注解的dispatch
方法,然后将事件发送给SQS。这样我们就可以保证事件只有在commit之后才会被处理。
我们有一个销售代金券的系统,这个销售过程必须与另一个系统集成。这种集成通过 AWS SQS 队列进行。
系统 A 处理订单,然后在处理结束时将消息发布到名为
new-orders-queue
的 SQS 队列。系统 B 从
new-orders-queue
中读取数据,进行某种处理,然后将另一个事件发布到另一个名为 [=13 的 SQS 队列=].系统 A 从
中创建的订单another-sqs-queue
读取数据,然后更新在步骤 1
订购流程(上面的第 1 步)很长,但没有什么特别复杂的。它在其数据库 (MySQL) 中进行一些验证,然后将一些插入内容写入某些表。
所有这些都发生在 Spring 的 @Transactional
上下文中。
问题是第 3 步有时会发生 在 来自第 1 步的订单最终提交到数据库之前,这会导致错误(必须更新的订单尚未在数据库中找到,因为它尚未提交)。如果我们稍后重试,该过程将正常进行。这种情况并非总是发生,但我们必须解决这个问题。
你看过了吗?
下面是第 1 步的简化(真实)伪代码:
@Transactional
public Result handleNewOrder(OrderData data) {
SqsClient sqsClient = new SqsClient();
validatePrices(data);
doSomeInserts(data);
Result result = createResult(data);
// the last line of the method, just before the return statement, is the line that post the event to the queue
sqsClient.sendEvent(Events.create(result));
return result;
}
在这个用 @Transactional
注释的方法结束时,应该提交一些东西,但是在提交发生之前不知何故第 3 步已经完成(至少看起来是这样)。
也许将事件发布移出事务边界是解决方案(实际上,我赞成它),因为这样我们可以保证只有在事务提交后才会处理事件到数据库。但是,如果我们与 SQS 的通信出现故障,我们将不得不使用某种重试机制。
这是要走的路还是您有更好的解决方案?
这听起来像是一个需要多个事务的操作。
例如,您可能有两个方法,每个方法都用@Transactional 注释:
@Transactional
public void startHandleNewOrder(OrderData data) {
// make changes to the database here and publish event to new-orders-queue
}
@Transactional
public Result finishHandleNewOrder(OrderData data) {
// await response from another-sqs-queue and compile result
}
这应该工作假设:
- 一个单独的服务 NOT 用 @Transactional 注释(即它在事务屏障之外)按顺序调用这些方法
或者,您可以在没有注释的情况下实现它,如下所示:
@Autowired
PlatformTransactionManager transactionManager;
@PersistenceContext
EntityManager entityManager;
public Result handleNewOrder(OrderData data) {
boolean rollback = true;
TransactionStatus status = getTransaction();
try {
// make changes to the database here and publish event to new-orders-queue
status.commit();
rollback = false;
} finally {
if (rollback)
status.rollback();
}
// this may or may not be necessary if you want to ensure you're reading
// fresh data from the database (otherwise cached from step #1 may be used)
entityManager.clear();
rollback = true;
status = getTransaction();
try {
// wait for a response and compile the result
status.commit();
rollback = false;
return result;
} finally {
if (rollback)
status.rollback();
}
}
private TransactionStatus getTransaction() {
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
return transactionManager.getTransaction(def);
}
最后,按照 M. Deinum 的建议,我用默认阶段 (TransactionPhase.AFTER_COMMIT
) 实现了 @TransactionalEventListener
。
像这样:
@TransactionalEventListener(classes = {SellVoucherEvent.class})
public void dispatch(SellVoucherEvent event) {
sqsClient.sendMessage("queue", turnEventToString(event));
}
此方法在 @Component
class 中实现,并且在我的事务上下文中,我通过 ApplicationEventPublisher
(由 Spring 注入)发布事件。
示例:
private final ApplicationEventPublisher publisher; // injected by Spring
@Transactional
public Result handleNewOrder(OrderData data) {
validatePrices(data);
doSomeInserts(data);
Result result = createResult(data);
SellVoucherEvent event = createEvent();
publisher.publishEvent(event); // publish the application event
return result;
}
然后commit后,调用@TransactionalEventListener
注解的dispatch
方法,然后将事件发送给SQS。这样我们就可以保证事件只有在commit之后才会被处理。