Spring 数据未保存实体
Spring Data not saving an entity
我遇到 Spring 数据未保存实体的问题。
应用逻辑是这样的:
- 另一个应用程序正在侦听负载相当重的 Kafka 主题(每秒数十条消息),并将消息插入 table 状态为 'NEW' 的数据库中。
- @Scheduled 方法加载一个状态为 'NEW' 的实体列表,这些实体被一个一个地转移到一个 FixedThreadPool(20 个线程),它们的状态设置为 'PROCESSING' 和一个在相同的 table.
上调用 saveAll 方法
@Scheduled(fixedDelay = 10_000)
public void scheduled() {
Pageable pageable = PageRequest.of(0, pageSize);
List<EntityClass> entities = entityService.getAllByStatusOrderByIdDesc(NEW_STATUS, pageable);
while (!entities.isEmpty()) {
entities.forEach(entity -> {
entity.setStatus(PROCESSING_STATUS);
process(entity);
});
entityService.saveAll(entities);
loggers = entityService.getAllByStatusOrderByIdDesc(NEW_STATUS, pageable);
}
private void process(Entity entity) {
threadPool.execute(
() -> processor.execute(
TaskBuilder.entity(entity).build()
)
);
}
- 在 FixedThreadPool 的线程中,一些业务逻辑由 TaskProcessor 通过几个 类 发生,导致每个实体的状态更改为 'OK' 或 'ERROR'。
- 业务逻辑的最后一步是将结果状态的实体更新为相同的table,这就是我遇到问题的地方:实体的一部分 不会持续存在并永远保持在 'PROCESSING' 状态。来自最后一步的最后一部分:
...
log.info("[SaveStep] [execute] status before {}", entity.getStatus());
Entity entity = entityService.save(entity);
log.info("[SaveStep] [execute] status after {}", entity.getStatus());
两种日志方法都显示正确的状态('OK' 或 'ERROR'),但数据库行始终包含 'PROCESSING' 状态。我尝试重做一个具有多种变体的 entityService.save() (entityRepository 是一个常规的 JPARepository):
- 常规 repo.save() 不起作用:
public Entity save(Entity entity) {
return repository.save(logger);
}
- 获取、刷新和保存不起作用:
@Transactional
public Entity save(Entity entity) {
Entity saved = repository.getFirstById(entity.getId());
saved.setStatus(entity.getStatus());
return repository.save(saved);
}
- 我认为 Hikari 池大小可能是个问题,但将其增加到 maximum-pool-size=20 甚至 30 都没有任何作用。
- 如果我用记录器包围选项 2,我在两个记录器行中都有正确的状态,但在数据库中没有:
@Transactional
public Entity save(Entity entity) {
Entity saved = repository.getFirstById(entity.getId());
log.info("[Service] [save] status before {}", saved.getStatus());
saved.setStatus(entity.getStatus());
log.info("[Service] [save] status after {}", saved.getStatus());
return repository.save(saved);
}
- 我试过本机查询,但没有成功:
@Modifying
@Transactional
@Query(value = "UPDATE entity_table_name set status = :status where id = :id", nativeQuery = true)
void updateStatus(@Param("id") Long id, @Param("status") String status);
- 我尝试用 repo.saveAndFlush() 替换 repo.save() 但没有效果。
目前我没有想法可以尝试。
主要的侮辱是大多数实体最终被数据库正确消化,只有大约 20% 被 save() 忽略,没有任何抛出异常。
感谢 Taylor,他完全正确地指出了我的问题。实体列表已被并行处理,并以足够快的速度保存正确的状态,甚至 before 调度程序命中他的 saveAll('PROCESSING') 行,这会导致状态重写。
您在将消息提交到池后将其设置为“正在处理”,这意味着池可能会先处理它,然后可能会发生“正在处理”更新。不确定这是否是您的问题,但您正在做的事情有可能。在提交到池之前尝试设置和saving/committing“处理”更新。
我遇到 Spring 数据未保存实体的问题。 应用逻辑是这样的:
- 另一个应用程序正在侦听负载相当重的 Kafka 主题(每秒数十条消息),并将消息插入 table 状态为 'NEW' 的数据库中。
- @Scheduled 方法加载一个状态为 'NEW' 的实体列表,这些实体被一个一个地转移到一个 FixedThreadPool(20 个线程),它们的状态设置为 'PROCESSING' 和一个在相同的 table. 上调用 saveAll 方法
@Scheduled(fixedDelay = 10_000)
public void scheduled() {
Pageable pageable = PageRequest.of(0, pageSize);
List<EntityClass> entities = entityService.getAllByStatusOrderByIdDesc(NEW_STATUS, pageable);
while (!entities.isEmpty()) {
entities.forEach(entity -> {
entity.setStatus(PROCESSING_STATUS);
process(entity);
});
entityService.saveAll(entities);
loggers = entityService.getAllByStatusOrderByIdDesc(NEW_STATUS, pageable);
}
private void process(Entity entity) {
threadPool.execute(
() -> processor.execute(
TaskBuilder.entity(entity).build()
)
);
}
- 在 FixedThreadPool 的线程中,一些业务逻辑由 TaskProcessor 通过几个 类 发生,导致每个实体的状态更改为 'OK' 或 'ERROR'。
- 业务逻辑的最后一步是将结果状态的实体更新为相同的table,这就是我遇到问题的地方:实体的一部分 不会持续存在并永远保持在 'PROCESSING' 状态。来自最后一步的最后一部分:
...
log.info("[SaveStep] [execute] status before {}", entity.getStatus());
Entity entity = entityService.save(entity);
log.info("[SaveStep] [execute] status after {}", entity.getStatus());
两种日志方法都显示正确的状态('OK' 或 'ERROR'),但数据库行始终包含 'PROCESSING' 状态。我尝试重做一个具有多种变体的 entityService.save() (entityRepository 是一个常规的 JPARepository):
- 常规 repo.save() 不起作用:
public Entity save(Entity entity) {
return repository.save(logger);
}
- 获取、刷新和保存不起作用:
@Transactional
public Entity save(Entity entity) {
Entity saved = repository.getFirstById(entity.getId());
saved.setStatus(entity.getStatus());
return repository.save(saved);
}
- 我认为 Hikari 池大小可能是个问题,但将其增加到 maximum-pool-size=20 甚至 30 都没有任何作用。
- 如果我用记录器包围选项 2,我在两个记录器行中都有正确的状态,但在数据库中没有:
@Transactional
public Entity save(Entity entity) {
Entity saved = repository.getFirstById(entity.getId());
log.info("[Service] [save] status before {}", saved.getStatus());
saved.setStatus(entity.getStatus());
log.info("[Service] [save] status after {}", saved.getStatus());
return repository.save(saved);
}
- 我试过本机查询,但没有成功:
@Modifying
@Transactional
@Query(value = "UPDATE entity_table_name set status = :status where id = :id", nativeQuery = true)
void updateStatus(@Param("id") Long id, @Param("status") String status);
- 我尝试用 repo.saveAndFlush() 替换 repo.save() 但没有效果。
目前我没有想法可以尝试。 主要的侮辱是大多数实体最终被数据库正确消化,只有大约 20% 被 save() 忽略,没有任何抛出异常。
感谢 Taylor,他完全正确地指出了我的问题。实体列表已被并行处理,并以足够快的速度保存正确的状态,甚至 before 调度程序命中他的 saveAll('PROCESSING') 行,这会导致状态重写。
您在将消息提交到池后将其设置为“正在处理”,这意味着池可能会先处理它,然后可能会发生“正在处理”更新。不确定这是否是您的问题,但您正在做的事情有可能。在提交到池之前尝试设置和saving/committing“处理”更新。