Java 8:如何将 for 循环并行转换为 运行?

Java 8: How can I convert a for loop to run in parallel?

for (int i=0; i<100000; i++) {
    // REST API request. 
restTemplate.exchange(url, HttpMethod.GET, request, String.class);
}

我有一种情况,我必须为 10 万个用户请求资源,并且需要 70 分钟才能完成。我试图尽可能多地清理我的代码,但我只能将它减少 4 分钟)。

由于每个请求都是相互独立的,我希望并行发送请求(可能是 10 秒、100 秒,甚至是 1000 秒的块,每个块都很快完成)。我希望我可以将时间减少到 10 分钟或接近的时间。我如何计算哪个块大小可以快速完成工作?

我找到了下面的方法,但我不知道程序是否一次处理了所有 20 个;或一次 5 个;或一次 10 个。

IntStream.range(0,20).parallel().forEach(i->{
     ... do something here
});

感谢您的帮助。我愿意接受任何建议或批评!!

更新: 我能够使用 IntStream 并且任务在 28 分钟内完成。但我不确定这是我能做的最好的。

根据您的情况,您可以使用 fork/join 框架或创建执行程序线程服务池。

      ExecutorService service = null;
    try {

        service = Executors.newFixedThreadPool(8);
        service.submit(() -> {

            //do your task
        });
    } catch (Exception e) {
    } finally {
        if (service != null) {
            service.shutdown();
        }

    }
    service.awaitTermination(1, TimeUnit.MINUTES);
    if(service.isTerminated())
        System.out.println("All threads have been finished");
    else 
        System.out.println("At least one thread running");

并使用fork/join框架

    class RequestHandler extends RecursiveAction {

    int start;
    int end;

    public RequestHandler(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected void compute() {
        if (end - start <= 10) {

            //REST Request
        } else {

            int middle = start + (end - start) / 2;
            invokeAll(new RequestHandler(start, middle), new RequestHandler(middle, end));
        }

    }

}

Public class MainClass{
   public void main(String[] args){

       ForkJoinTask<?> task = new RequestHandler(0, 100000);
       ForkJoinPool pool = new ForkJoinPool();
       pool.invoke(task);
   }
}

parallel() 的标准调用将为您的机器可用的每个核心减去一个核心创建一个线程,使用 Common Fork Join Pool

如果要自己指定并行度,会有不同的可能:

  1. 更改公共池的并行度:System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20")
  2. 使用自己的池:

示例:

int allRequestsCount = 20;
int parallelism = 4; // Vary on your own

ForkJoinPool forkJoinPool = new ForkJoinPool(parallelism);
IntStream.range(0, parallelism).forEach(i -> forkJoinPool.submit(() -> {
  int chunkSize = allRequestsCount / parallelism;
  IntStream.range(i * chunkSize, i * chunkSize + chunkSize)
           .forEach(num -> {

             // Simulate long running operation
             try {
               Thread.sleep(1000);
             } catch (InterruptedException e) {
               e.printStackTrace();
             }

             System.out.println(Thread.currentThread().getName() + ": " + num);
           });
}));

这个实现只是为了给你一个想法的示例。

我已经写了一篇关于那个的短文。它包含允许您控制池大小的简单工具:

https://gt-dev.blogspot.com/2016/07/java-8-threads-parallel-stream-how-to.html

我在 Java 8 中使用了以下代码,它完成了工作。我能够将批处理作业从 28 分钟减少到 运行 到 3:39 分钟。

IntStream.range(0, 100000).parallel().forEach(i->{
     restTemplate.exchange(url, HttpMethod.GET, request, String.class);
}
});