事务提交发生得太晚(在事件被另一个系统处理之后)

Commit of transaction happening too late (after the event is processed by another system)

我们有一个销售代金券的系统,这个销售过程必须与另一个系统集成。这种集成通过 AWS SQS 队列进行。

  1. 系统 A 处理订单,然后在处理结束时将消息发布到名为 new-orders-queue 的 SQS 队列。

  2. 系统 Bnew-orders-queue 中读取数据,进行某种处理,然后将另一个事件发布到另一个名为 [=13 的 SQS 队列=].

  3. 系统 Aanother-sqs-queue 读取数据,然后更新在步骤 1

    中创建的订单

订购流程(上面的第 1 步)很长,但没有什么特别复杂的。它在其数据库 (MySQL) 中进行一些验证,然后将一些插入内容写入某些表。

所有这些都发生在 Spring 的 @Transactional 上下文中。

问题是第 3 步有时会发生 来自第 1 步的订单最终提交到数据库之前,这会导致错误(必须更新的订单尚未在数据库中找到,因为它尚未提交)。如果我们稍后重试,该过程将正常进行。这种情况并非总是发生,但我们必须解决这个问题。

你看过了吗?

下面是第 1 步的简化(真实)伪代码:

@Transactional
public Result handleNewOrder(OrderData data) {
    SqsClient sqsClient = new SqsClient();
    validatePrices(data);
    doSomeInserts(data);
    Result result = createResult(data);
    
    // the last line of the method, just before the return statement, is the line that post the event to the queue
    sqsClient.sendEvent(Events.create(result));
    
    return result;
}

在这个用 @Transactional 注释的方法结束时,应该提交一些东西,但是在提交发生之前不知何故第 3 步已经完成(至少看起来是这样)。

也许将事件发布移出事务边界是解决方案(实际上,我赞成它),因为这样我们可以保证只有在事务提交后才会处理事件到数据库。但是,如果我们与 SQS 的通信出现故障,我们将不得不使用某种重试机制。

这是要走的路还是您有更好的解决方案?

这听起来像是一个需要多个事务的操作。

例如,您可能有两个方法,每个方法都用@Transactional 注释:

@Transactional
public void startHandleNewOrder(OrderData data) {
  // make changes to the database here and publish event to new-orders-queue
}

@Transactional
public Result finishHandleNewOrder(OrderData data) {
  // await response from another-sqs-queue and compile result
}

这应该工作假设:

  1. 一个单独的服务 NOT 用 @Transactional 注释(即它在事务屏障之外)按顺序调用这些方法

或者,您可以在没有注释的情况下实现它,如下所示:

@Autowired
PlatformTransactionManager transactionManager;

@PersistenceContext
EntityManager entityManager;

public Result handleNewOrder(OrderData data) {
  boolean rollback = true;
  TransactionStatus status = getTransaction();
  try {
    // make changes to the database here and publish event to new-orders-queue
    status.commit();
    rollback = false;
  } finally {
    if (rollback)
      status.rollback();
  }

  // this may or may not be necessary if you want to ensure you're reading
  // fresh data from the database (otherwise cached from step #1 may be used)
  entityManager.clear(); 

  rollback = true;
  status = getTransaction();
  try {
    // wait for a response and compile the result
    status.commit();
    rollback = false;
    return result;
  } finally {
    if (rollback)
      status.rollback();
  }
}

private TransactionStatus getTransaction() {
  DefaultTransactionDefinition def = new DefaultTransactionDefinition();
  def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
  return transactionManager.getTransaction(def);
}

最后,按照 M. Deinum 的建议,我用默认阶段 (TransactionPhase.AFTER_COMMIT) 实现了 @TransactionalEventListener

像这样:

@TransactionalEventListener(classes = {SellVoucherEvent.class})
public void dispatch(SellVoucherEvent event) {
    sqsClient.sendMessage("queue", turnEventToString(event));
}

此方法在 @Component class 中实现,并且在我的事务上下文中,我通过 ApplicationEventPublisher(由 Spring 注入)发布事件。

示例:

private final ApplicationEventPublisher publisher; // injected by Spring

@Transactional
public Result handleNewOrder(OrderData data) {
    validatePrices(data);
    doSomeInserts(data);
    Result result = createResult(data);
    
    SellVoucherEvent event = createEvent();
    publisher.publishEvent(event); // publish the application event
    
    return result;
}

然后commit后,调用@TransactionalEventListener注解的dispatch方法,然后将事件发送给SQS。这样我们就可以保证事件只有在commit之后才会被处理。