如何使用 Java 8 的 Fork/Join 框架并行化循环

How to parallelize loops with Java 8's Fork/Join framework

如何使用 Java 8 的 Fork/Join 框架并行化循环。实际上我没有使用多线程。我在 SO 中阅读了很多问题。现在我无法在 Java 8 中实现列表的并行处理。任何人都可以帮助我吗?

我尝试过 this link 中的东西。

routes.stream().parallel().forEach(this::doSomething);

场景如基于路由列表的列表我需要分配任务并执行我需要像 foreach 循环的 insted 我希望基于数组大小并行执行。

我的问题是在处理 updateSchedules 服务时花费了太多时间。这就是我想在这里实现线程概念的原因。 scheduleService.updateSchedules(originId, destinationId,req.getJourneyDate());

for (Availabilities ar : routes) {
  try {
    log.info("Starting for bus" + ar);
    Bus bus = new Bus();

    // Get schedule list
    BitlaSchedules schedule = scheduleRepo
      .findByOriginIdAndDestinationIdAndScheduleIdAndTravelIdAndRouteId(originId, 
         destinationId, ar.getScheduleId(), ar.getTravelId(), ar.getRouteId());

    if (schedule == null) {
      scheduleService.updateSchedules(originId, destinationId,req.getJourneyDate());
      schedule = scheduleRepo
        .findByOriginIdAndDestinationIdAndScheduleIdAndTravelIdAndRouteId(originId, 
          destinationId, ar.getScheduleId(), ar.getTravelId(), ar.getRouteId());
    }
  } catch(Exception e) {
    log.error(e.getMessage());
  }
}

可能最基本的错误是您正在尝试这样做。

不要! fork/join 框架是一个非常 的工程——它解决了一个非常特定的领域: - 解决 CPU 密集型问题; - 可以在不共享资源的情况下拆分(即,之间没有同步或锁定)。

您的代码似乎使用了外部服务: - 如果该服务使用任何类型的数据库,那么您的问题并不 CPU 密集; - 即使没有,那么 - 因为有一个明显的 update,所以有一个共享的、可变的状态需要同步(特别是因为我们似乎有多个作者)。

这意味着您使用并行流一无所获

只需使用带线程池的标准执行程序并将您的项目作为任务提交。

正如@fdreger 所说,它只会帮助您完成 CPU 密集型任务。 因此,在做出任何假设为什么某些东西应该 运行 并行以获得性能之前,请帮自己一个忙并介绍一下。大多数时候瓶颈是IO相关的。

我将给您一个非常简单的示例,说明如何在 java 中使用并行流。

public class Test {

    public static void main(String[] args) {
        // some dummy data
        List<Integer> list = new ArrayList<>();
        for (int i = 0; i < 20; ++i) list.add(i);

        // to simulate some CPU intensive work
        Random random = new SecureRandom();

        List<String> result = list.parallelStream().map(i -> {

            // simulate work load
            int millis = 0;
            try {
                millis = random.nextInt(1000);
                Thread.sleep(millis);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }

            // return any desired result
            return "Done something with " + i + " in thread " + Thread.currentThread().getName() + " took " + millis + "ms";
        }).collect(Collectors.toList()); // collect joins - will return once all the workers are done

        // print the result
        result.forEach(System.out::println);    
    }
}