在 Spring Boot 的一个事务中组发送 kafka 消息和数据库更新
Group send kafka message and DB update in one Transaction in SpringBoot
我需要在一笔交易中执行多项操作
- 生成kafka消息
- 更新Table一个
- 更新TableB
我可以发送消息并且不更新两个表(A 和 B)。我不能生成消息并更新其中一个表。
我正在尝试使用 @Transactional
注释来实现我的目标
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.SERIALIZABLE)
public void handle(Event approvalEvent) {
var entity = entityService.getLatestVersion(approvalEvent.getTransactionId());
entityService.approve(entity.getTransactionId());
logService.logApproval(entity);
producer.send(approvalEvent);
}
我做对了吗?
上述方法的问题是您在一个事务中与两个不同的系统(数据库和消息队列)进行交互。在一个系统上操作成功而在另一个系统上操作失败时要处理的场景组合使解决方案变得复杂。
微服务世界中存在处理完全相同场景的模式。它被称为发件箱模式。
您可以阅读更多相关信息 here。
简短的总结是您的数据库中有一个额外的 table 称为发件箱,其中包含要发布到消息队列的消息。
在 adding\updating 实体的数据库事务中,您在发件箱 table 工具中插入一行,其中包含实体操作的详细信息。
然后您从发件箱 table 异步读取行并通过轮询或使用更改数据捕获发布到消息队列。查看使用 debezium 的示例实现 here。
您的交易代码如下所示。
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.SERIALIZABLE)
public void handle(Event approvalEvent) {
var entity = entityService.getLatestVersion(approvalEvent.getTransactionId());
entityService.approve(entity.getTransactionId());
logService.logApproval(entity);
//Outbox is the table containing the records to be published to MQ
outboxRepo.save(approvalEvent);
}
我需要在一笔交易中执行多项操作
- 生成kafka消息
- 更新Table一个
- 更新TableB
我可以发送消息并且不更新两个表(A 和 B)。我不能生成消息并更新其中一个表。
我正在尝试使用 @Transactional
注释来实现我的目标
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.SERIALIZABLE)
public void handle(Event approvalEvent) {
var entity = entityService.getLatestVersion(approvalEvent.getTransactionId());
entityService.approve(entity.getTransactionId());
logService.logApproval(entity);
producer.send(approvalEvent);
}
我做对了吗?
上述方法的问题是您在一个事务中与两个不同的系统(数据库和消息队列)进行交互。在一个系统上操作成功而在另一个系统上操作失败时要处理的场景组合使解决方案变得复杂。
微服务世界中存在处理完全相同场景的模式。它被称为发件箱模式。
您可以阅读更多相关信息 here。
简短的总结是您的数据库中有一个额外的 table 称为发件箱,其中包含要发布到消息队列的消息。
在 adding\updating 实体的数据库事务中,您在发件箱 table 工具中插入一行,其中包含实体操作的详细信息。
然后您从发件箱 table 异步读取行并通过轮询或使用更改数据捕获发布到消息队列。查看使用 debezium 的示例实现 here。
您的交易代码如下所示。
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.SERIALIZABLE)
public void handle(Event approvalEvent) {
var entity = entityService.getLatestVersion(approvalEvent.getTransactionId());
entityService.approve(entity.getTransactionId());
logService.logApproval(entity);
//Outbox is the table containing the records to be published to MQ
outboxRepo.save(approvalEvent);
}