Spring 数据未保存实体

Spring Data not saving an entity

我遇到 Spring 数据未保存实体的问题。 应用逻辑是这样的:

  1. 另一个应用程序正在侦听负载相当重的 Kafka 主题(每秒数十条消息),并将消息插入 table 状态为 'NEW' 的数据库中。
  2. @Scheduled 方法加载一个状态为 'NEW' 的实体列表,这些实体被一个一个地转移到一个 FixedThreadPool(20 个线程),它们的状态设置为 'PROCESSING' 和一个在相同的 table.
  3. 上调用 saveAll 方法
@Scheduled(fixedDelay = 10_000)
public void scheduled() {
    Pageable pageable = PageRequest.of(0, pageSize);
    List<EntityClass> entities = entityService.getAllByStatusOrderByIdDesc(NEW_STATUS, pageable);

    while (!entities.isEmpty()) {
        entities.forEach(entity -> {
            entity.setStatus(PROCESSING_STATUS);
            process(entity);
        });

        entityService.saveAll(entities);
        loggers = entityService.getAllByStatusOrderByIdDesc(NEW_STATUS, pageable);
    }

private void process(Entity entity) {
    threadPool.execute(
         () -> processor.execute(
            TaskBuilder.entity(entity).build()
         )
    );
}
  1. 在 FixedThreadPool 的线程中,一些业务逻辑由 TaskProcessor 通过几个 类 发生,导致每个实体的状态更改为 'OK' 或 'ERROR'。
  2. 业务逻辑的最后一步是将结果状态的实体更新为相同的table,这就是我遇到问题的地方:实体的一部分 不会持续存在并永远保持在 'PROCESSING' 状态。来自最后一步的最后一部分:
...
log.info("[SaveStep] [execute] status before {}", entity.getStatus());
Entity entity = entityService.save(entity);
log.info("[SaveStep] [execute] status after {}", entity.getStatus());

两种日志方法都显示正确的状态('OK' 或 'ERROR'),但数据库行始终包含 'PROCESSING' 状态。我尝试重做一个具有多种变体的 entityService.save() (entityRepository 是一个常规的 JPARepository):

  1. 常规 repo.save() 不起作用:
public Entity save(Entity entity) {
   return repository.save(logger);
}
  1. 获取、刷新和保存不起作用:
@Transactional
public Entity save(Entity entity) {
   Entity saved = repository.getFirstById(entity.getId());
   saved.setStatus(entity.getStatus());
   return repository.save(saved);
}
  1. 我认为 Hikari 池大小可能是个问题,但将其增加到 maximum-pool-size=20 甚至 30 都没有任何作用。
  2. 如果我用记录器包围选项 2,我在两个记录器行中都有正确的状态,但在数据库中没有:
@Transactional
public Entity save(Entity entity) {
   Entity saved = repository.getFirstById(entity.getId());
   log.info("[Service] [save] status before {}", saved.getStatus());
   saved.setStatus(entity.getStatus());
   log.info("[Service] [save] status after {}", saved.getStatus());
   return repository.save(saved);
}
  1. 我试过本机查询,但没有成功:
@Modifying
@Transactional
@Query(value = "UPDATE entity_table_name set status = :status where id = :id", nativeQuery = true)
void updateStatus(@Param("id") Long id, @Param("status") String status);
  1. 我尝试用 repo.saveAndFlush() 替换 repo.save() 但没有效果。

目前我没有想法可以尝试。 主要的侮辱是大多数实体最终被数据库正确消化,只有大约 20% 被 save() 忽略,没有任何抛出异常。

感谢 Taylor,他完全正确地指出了我的问题。实体列表已被并行处理,并以足够快的速度保存正确的状态,甚至 before 调度程序命中他的 saveAll('PROCESSING') 行,这会导致状态重写。

您在将消息提交到池后将其设置为“正在处理”,这意味着池可能会先处理它,然后可能会发生“正在处理”更新。不确定这是否是您的问题,但您正在做的事情有可能。在提交到池之前尝试设置和saving/committing“处理”更新。