使用 CompletableFuture 的并发数据库调用

concurrent DB calls using CompletableFuture

我正在尝试提高我的应用程序仪表板页面的性能。它是一个 spring 启动应用程序,休眠用于 Dao 层。 我需要针对 5 种不同的警报类型(基本上是过滤器)获取参与者的数量。

到目前为止,我正在尝试并发启动数据库查询,我已经这样做了:

  Map<Long,CompletableFuture<Long>> furtureMap = new HashMap<>();
      Map<Long, Long> alertMap = new HashMap<>();
      if(CollectionUtils.isNotEmpty(alertList)) {

        for(AlertMasterI18N alertMasterI18N : alertList) {
          dashboardFilterDto.setAlertId(alertMasterI18N.getAlertMaster().getId());
          setDatesForDashboardFilterDto(dashboardFilterDto);
          CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
            try {
              return dashboardDao.getParticipantsCount(dashboardFilterDto);
            } catch (DaoException e) {
              e.printStackTrace();
            }
            return 0L;
          });
          furtureMap.put(alertMasterI18N.getAlertMaster().getId(), future);
        }
       /* CompletableFuture.allOf(furtureMap.values().toArray(new CompletableFuture[furtureMap.size()])).get(); */
       furtureMap.entrySet().stream().forEach( entry -> {
         entry.getValue().thenAccept( count -> alertMap.put(entry.getKey(),count));
       });
      }

不知何故,我得到的结果比我通过顺序执行得到的结果更奇怪。所有警报都会返回相同的计数,有时它是 0,而如果我还原代码,它会给出正确的结果。 我也尝试过在其上使用 CompletableFuture.allOf 和 callig get() 但那次所有警报的计数始终为 0.

这是我第一次尝试异步编程 谁能告诉我这里有什么我遗漏的吗?

假设 DAO 层一切正常,我能在提供的代码中看到的唯一缺失点是最后调用 get()join()。如果没有此调用,CompletableFuture 的 none 将实际执行。

以下大纲添加所有 newly 在最后一个 forEach 循环中创建 CompletableFuture ,然后等待它们。

    Map<Long,CompletableFuture<Long>> furtureMap = new HashMap<>();
    Map<Long, Long> alertMap = new HashMap<>();
    if(CollectionUtils.isNotEmpty(alertList)) {

        for (AlertMasterI18N alertMasterI18N : alertList) {
            dashboardFilterDto.setAlertId(alertMasterI18N.getAlertMaster().getId());
            setDatesForDashboardFilterDto(dashboardFilterDto);
            CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
                try {
                    return dashboardDao.getParticipantsCount(dashboardFilterDto);
                } catch (DaoException e) {
                    e.printStackTrace();
                }
                return 0L;
            });
            furtureMap.put(alertMasterI18N.getAlertMaster().getId(), future);
        }
        List<CompletableFuture<Void>> finalCollector = new ArrayList<>();
        /* CompletableFuture.allOf(furtureMap.values().toArray(new CompletableFuture[furtureMap.size()])).get(); */
        furtureMap.entrySet().stream().forEach(entry -> {
            finalCollector.add(entry.getValue()
                    .thenAccept(count -> alertMap.put(entry.getKey(), count)));
        });
        /* Wait for all the results */
        CompletableFuture
                .allOf(finalCollector.toArray(new CompletableFuture[finalCollector.size()]))
                .join();
    }

如果这不起作用,那么很可能是 DAO 层中出现了问题,导致异步执行失败(而顺序执行成功)。分享您如何在 DAO 层中获取数据将进一步阐明问题的根源。