使用@TransactionalEventListener 在 JPA 事务中发送消息时未提交(丢失)消息
Message are not commited (loss) when using @TransactionalEventListener to send a message in a JPA Transaction
代码背景:
为了复制生产场景,我创建了一个虚拟应用程序,它基本上会在事务中将某些内容保存在数据库中,并且以相同的方法,它 publishEvent 和 publishEvent 向 rabbitMQ 发送消息。
类 和用法
事务从此方法开始。:
@Override
@Transactional
public EmpDTO createEmployeeInTrans(EmpDTO empDto) {
return createEmployee(empDto);
}
该方法将记录保存在DB中,同时触发publishEvent
@Override
public EmpDTO createEmployee(EmpDTO empDTO) {
EmpEntity empEntity = new EmpEntity();
BeanUtils.copyProperties(empDTO, empEntity);
System.out.println("<< In Transaction : "+TransactionSynchronizationManager.getCurrentTransactionName()+" >> Saving data for employee " + empDTO.getEmpCode());
// Record data into a database
empEntity = empRepository.save(empEntity);
// Sending event , this will send the message.
eventPublisher.publishEvent(new ActivityEvent(empDTO));
return createResponse(empDTO, empEntity);
}
这是活动事件
import org.springframework.context.ApplicationEvent;
import com.kuldeep.rabbitMQProducer.dto.EmpDTO;
public class ActivityEvent extends ApplicationEvent {
public ActivityEvent(EmpDTO source) {
super(source);
}
}
这是上述事件的 TransactionalEventListener。
//@Transactional(propagation = Propagation.REQUIRES_NEW)
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void onActivitySave(ActivityEvent activityEvent) {
System.out.println("Activity got event ... Sending message .. ");
kRabbitTemplate.convertAndSend(exchange, routingkey, empDTO);
}
这是 kRabbitTemplate 是这样的 bean 配置:
@Bean
public RabbitTemplate kRabbitTemplate(ConnectionFactory connectionFactory) {
final RabbitTemplate kRabbitTemplate = new RabbitTemplate(connectionFactory);
kRabbitTemplate.setChannelTransacted(true);
kRabbitTemplate.setMessageConverter(kJsonMessageConverter());
return kRabbitTemplate;
}
问题定义
当我使用上述代码流保存记录并在 rabbitMQ 上发送消息时,我的消息未在服务器上传递意味着它们丢失了。
我对 AMQP 中事务的理解是:
- 如果模板已处理,但未从 Spring/JPA 事务调用 convertAndSend,则在模板的 convertAndSend 方法中提交消息。
// this is a snippet from org.springframework.amqp.rabbit.core.RabbitTemplate.doSend()
if (isChannelLocallyTransacted(channel)) {
// Transacted channel created by this template -> commit.
RabbitUtils.commitIfNecessary(channel);
}
- 但是如果模板被处理并且从 Spring/JPA 事务调用 convertAndSend 那么 doSend 方法中的这个 isChannelLocallyTransacted 将评估为 false 并且提交将在启动 Spring/JPA 事务的方法中完成。
我在调查上面代码中消息丢失的原因后发现的。
- Spring 事务在我调用 convertAndSend 方法时处于活动状态,因此它应该在 Spring 事务中提交消息。
- 为此,RabbitTemplate 在 org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils 的 bindResourceToTransaction 中发送消息之前绑定资源并注册同步。
public static RabbitResourceHolder bindResourceToTransaction(RabbitResourceHolder resourceHolder,
ConnectionFactory connectionFactory, boolean synched) {
if (TransactionSynchronizationManager.hasResource(connectionFactory)
|| !TransactionSynchronizationManager.isActualTransactionActive() || !synched) {
return (RabbitResourceHolder) TransactionSynchronizationManager.getResource(connectionFactory); // NOSONAR never null
}
TransactionSynchronizationManager.bindResource(connectionFactory, resourceHolder);
resourceHolder.setSynchronizedWithTransaction(true);
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronizationManager.registerSynchronization(new RabbitResourceSynchronization(resourceHolder,
connectionFactory));
}
return resourceHolder;
}
在我的代码中,资源绑定后,无法注册同步,因为TransactionSynchronizationManager.isSynchronizationActive()==false
。由于它无法注册同步,因此 spring rabbitMQ 消息没有发生提交,因为 AbstractPlatformTransactionManager.triggerAfterCompletion
为每个同步调用 RabbitMQ 的提交。
因为上面的问题我遇到了什么问题
- 消息未在 spring 事务中提交,因此消息丢失。
- 由于在 bindResourceToTransaction 中添加了资源,因此该资源保持绑定状态,不允许添加该资源以供任何其他消息在同一线程中发送。
TransactionSynchronizationManager.isSynchronizationActive()==false
的可能根本原因
- 我发现启动事务的方法删除了 org.springframework.transaction.support.AbstractPlatformTransactionManager class 的 triggerAfterCompletion 中的同步。因为
status.isNewSynchronization()
在数据库操作后评估了 true
(如果我在没有 ApplicationEvent 的情况下调用 convertAndSend 通常不会发生这种情况)。
private void triggerAfterCompletion(DefaultTransactionStatus status, int completionStatus) {
if (status.isNewSynchronization()) {
List<TransactionSynchronization> synchronizations = TransactionSynchronizationManager.getSynchronizations();
TransactionSynchronizationManager.clearSynchronization();
if (!status.hasTransaction() || status.isNewTransaction()) {
if (status.isDebug()) {
logger.trace("Triggering afterCompletion synchronization");
}
// No transaction or new transaction for the current scope ->
// invoke the afterCompletion callbacks immediately
invokeAfterCompletion(synchronizations, completionStatus);
}
else if (!synchronizations.isEmpty()) {
// Existing transaction that we participate in, controlled outside
// of the scope of this Spring transaction manager -> try to register
// an afterCompletion callback with the existing (JTA) transaction.
registerAfterCompletionWithExistingTransaction(status.getTransaction(), synchronizations);
}
}
}
我在这个问题上做了什么来克服
我只是在 onActivitySave 方法中添加 @Transactional(propagation = Propagation.REQUIRES_NEW)
和 on @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
并且它在启动新事务时起作用。
我需要知道的
- 为什么在使用 ApplicationEvent 时在 triggerAfterCompletion 方法中
status.isNewSynchronization
?
- 如果事务应该在父方法中终止,为什么我在 Listner class 中得到
TransactionSynchronizationManager.isActualTransactionActive()==true
?
- 如果实际交易活动,是否应该删除同步?
- 在 bindResourceToTransaction 中,spring AMQP 假设没有同步的活动事务吗?如果答案是肯定的,为什么不同步。如果没有激活就初始化?
- 如果我正在传播一个新交易,那么我将丢失父交易,有没有更好的方法来做到这一点?
请帮助我解决这个问题,这是一个热门的生产问题,我不太确定我所做的修复。
这是一个错误; RabbitMQ 事务代码比 @TransactionalEventListener
代码早很多年。
问题是,在这种配置下,我们处于准事务状态,虽然确实有一个事务在处理中,但同步已经被清除,因为事务已经提交。
使用 @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
有效。
我看到你已经提出了一个问题:
https://github.com/spring-projects/spring-amqp/issues/1309
以后有问题最好在这里提问,或者觉得有bug就提出issue。不要两者都做。
代码背景:
为了复制生产场景,我创建了一个虚拟应用程序,它基本上会在事务中将某些内容保存在数据库中,并且以相同的方法,它 publishEvent 和 publishEvent 向 rabbitMQ 发送消息。
类 和用法
事务从此方法开始。:
@Override
@Transactional
public EmpDTO createEmployeeInTrans(EmpDTO empDto) {
return createEmployee(empDto);
}
该方法将记录保存在DB中,同时触发publishEvent
@Override
public EmpDTO createEmployee(EmpDTO empDTO) {
EmpEntity empEntity = new EmpEntity();
BeanUtils.copyProperties(empDTO, empEntity);
System.out.println("<< In Transaction : "+TransactionSynchronizationManager.getCurrentTransactionName()+" >> Saving data for employee " + empDTO.getEmpCode());
// Record data into a database
empEntity = empRepository.save(empEntity);
// Sending event , this will send the message.
eventPublisher.publishEvent(new ActivityEvent(empDTO));
return createResponse(empDTO, empEntity);
}
这是活动事件
import org.springframework.context.ApplicationEvent;
import com.kuldeep.rabbitMQProducer.dto.EmpDTO;
public class ActivityEvent extends ApplicationEvent {
public ActivityEvent(EmpDTO source) {
super(source);
}
}
这是上述事件的 TransactionalEventListener。
//@Transactional(propagation = Propagation.REQUIRES_NEW)
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void onActivitySave(ActivityEvent activityEvent) {
System.out.println("Activity got event ... Sending message .. ");
kRabbitTemplate.convertAndSend(exchange, routingkey, empDTO);
}
这是 kRabbitTemplate 是这样的 bean 配置:
@Bean
public RabbitTemplate kRabbitTemplate(ConnectionFactory connectionFactory) {
final RabbitTemplate kRabbitTemplate = new RabbitTemplate(connectionFactory);
kRabbitTemplate.setChannelTransacted(true);
kRabbitTemplate.setMessageConverter(kJsonMessageConverter());
return kRabbitTemplate;
}
问题定义
当我使用上述代码流保存记录并在 rabbitMQ 上发送消息时,我的消息未在服务器上传递意味着它们丢失了。
我对 AMQP 中事务的理解是:
- 如果模板已处理,但未从 Spring/JPA 事务调用 convertAndSend,则在模板的 convertAndSend 方法中提交消息。
// this is a snippet from org.springframework.amqp.rabbit.core.RabbitTemplate.doSend()
if (isChannelLocallyTransacted(channel)) {
// Transacted channel created by this template -> commit.
RabbitUtils.commitIfNecessary(channel);
}
- 但是如果模板被处理并且从 Spring/JPA 事务调用 convertAndSend 那么 doSend 方法中的这个 isChannelLocallyTransacted 将评估为 false 并且提交将在启动 Spring/JPA 事务的方法中完成。
我在调查上面代码中消息丢失的原因后发现的。
- Spring 事务在我调用 convertAndSend 方法时处于活动状态,因此它应该在 Spring 事务中提交消息。
- 为此,RabbitTemplate 在 org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils 的 bindResourceToTransaction 中发送消息之前绑定资源并注册同步。
public static RabbitResourceHolder bindResourceToTransaction(RabbitResourceHolder resourceHolder,
ConnectionFactory connectionFactory, boolean synched) {
if (TransactionSynchronizationManager.hasResource(connectionFactory)
|| !TransactionSynchronizationManager.isActualTransactionActive() || !synched) {
return (RabbitResourceHolder) TransactionSynchronizationManager.getResource(connectionFactory); // NOSONAR never null
}
TransactionSynchronizationManager.bindResource(connectionFactory, resourceHolder);
resourceHolder.setSynchronizedWithTransaction(true);
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronizationManager.registerSynchronization(new RabbitResourceSynchronization(resourceHolder,
connectionFactory));
}
return resourceHolder;
}
在我的代码中,资源绑定后,无法注册同步,因为TransactionSynchronizationManager.isSynchronizationActive()==false
。由于它无法注册同步,因此 spring rabbitMQ 消息没有发生提交,因为 AbstractPlatformTransactionManager.triggerAfterCompletion
为每个同步调用 RabbitMQ 的提交。
因为上面的问题我遇到了什么问题
- 消息未在 spring 事务中提交,因此消息丢失。
- 由于在 bindResourceToTransaction 中添加了资源,因此该资源保持绑定状态,不允许添加该资源以供任何其他消息在同一线程中发送。
TransactionSynchronizationManager.isSynchronizationActive()==false
- 我发现启动事务的方法删除了 org.springframework.transaction.support.AbstractPlatformTransactionManager class 的 triggerAfterCompletion 中的同步。因为
status.isNewSynchronization()
在数据库操作后评估了true
(如果我在没有 ApplicationEvent 的情况下调用 convertAndSend 通常不会发生这种情况)。
private void triggerAfterCompletion(DefaultTransactionStatus status, int completionStatus) {
if (status.isNewSynchronization()) {
List<TransactionSynchronization> synchronizations = TransactionSynchronizationManager.getSynchronizations();
TransactionSynchronizationManager.clearSynchronization();
if (!status.hasTransaction() || status.isNewTransaction()) {
if (status.isDebug()) {
logger.trace("Triggering afterCompletion synchronization");
}
// No transaction or new transaction for the current scope ->
// invoke the afterCompletion callbacks immediately
invokeAfterCompletion(synchronizations, completionStatus);
}
else if (!synchronizations.isEmpty()) {
// Existing transaction that we participate in, controlled outside
// of the scope of this Spring transaction manager -> try to register
// an afterCompletion callback with the existing (JTA) transaction.
registerAfterCompletionWithExistingTransaction(status.getTransaction(), synchronizations);
}
}
}
我在这个问题上做了什么来克服
我只是在 onActivitySave 方法中添加 @Transactional(propagation = Propagation.REQUIRES_NEW)
和 on @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
并且它在启动新事务时起作用。
我需要知道的
- 为什么在使用 ApplicationEvent 时在 triggerAfterCompletion 方法中
status.isNewSynchronization
? - 如果事务应该在父方法中终止,为什么我在 Listner class 中得到
TransactionSynchronizationManager.isActualTransactionActive()==true
? - 如果实际交易活动,是否应该删除同步?
- 在 bindResourceToTransaction 中,spring AMQP 假设没有同步的活动事务吗?如果答案是肯定的,为什么不同步。如果没有激活就初始化?
- 如果我正在传播一个新交易,那么我将丢失父交易,有没有更好的方法来做到这一点?
请帮助我解决这个问题,这是一个热门的生产问题,我不太确定我所做的修复。
这是一个错误; RabbitMQ 事务代码比 @TransactionalEventListener
代码早很多年。
问题是,在这种配置下,我们处于准事务状态,虽然确实有一个事务在处理中,但同步已经被清除,因为事务已经提交。
使用 @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
有效。
我看到你已经提出了一个问题:
https://github.com/spring-projects/spring-amqp/issues/1309
以后有问题最好在这里提问,或者觉得有bug就提出issue。不要两者都做。