JAVA:将列表拆分为更小的列表,然后将它们流式传输到多个线程中

JAVA: Split list into smaller lists and then stream them in multiple threads

我有一个数据库,其中有一个 table 和链接。

我发现我可以借助分区将一个列表拆分成更小的列表。 根据这篇文章,分区 class 似乎是最快的 (https://e.printstacktrace.blog/divide-a-list-to-lists-of-n-size-in-Java-8/)

在我将它们分成更小的列表后,我想使用这些链接并同时从中抓取数据。我本可以使用一个列表,然后:

linkList.parallelStream().forEach(link -> {
        ScrapeLink(link);});

并设置 System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "5");

但在我的例子中,我想将它们拆分成更小的列表,然后将 parallelStream 转换为另一种方法,我使用 ScraperAPI 在一个会话中对每个链接进行拆分(使用 session_number 通过设置重用相同的代理例如 session_number=123。)

所以当我有一个这样的列表时: 最终列表链接 = Arrays.asList("link1","link2","link3","link4","link5","link6","link7");

System.out.println(Partition.ofSize(数字, 3));

我会有 [[link1, link2, link3], [link4, link5, link6], [link7]] 但是,当我想在多个线程中同时处理这些小链接列表时,我该怎么做呢?

我的想法是使用 Java 8 个流。但他们可能是更好的方法?

您可以使用默认的 forkjoinpool(您提到的容量为 5)

还有一个为您的子列表定义的自定义线程池。

所以你需要先像这样制作一个可运行的class,稍后你将提交到你的“新”线程池

    @AllArgsConstructor
    public void LinkProcessorTask implements Runnable {
        private String link;
        
        @Override
        public void run() {
            //do something with your link in the sublist
        }
    }
    

    public void doWork() {

      List<List<String>> subListsOfLinks = .... // partitioning function

      subListsOfLinks.parallelStream().forEach(list -> {
          ExecutorService executorService = Executors.newFixedThreadPool(4 //concurrency);
          for(String link: list) {
              LinkProcessorTask linkProcessorTask = new LinkProcessorTask(link);
              executorService.submit(linkProcessorTask);
              executorService.awaitTermination(//Timeout);

          }
      })
    }

现在由您自己决定是否要使这个新线程池成为具有固定并发性的全局线程池。或者您想在 ForkJoinPool 中调用。

如果你进去,total number of threads spawned = ForkJoinPoolConcurrency * CustomThreadPoolConcurrency.

否则就是ForkJoinPoolConcurrency + CustomThreadPoolConcurrency.

取决于您的机器等,多种因素。

如果您想等待一组中的所有链接先完成,然后再继续,则可以使用 CountDownLatch 避免繁重的 awaitTermination 方法。

不要使用流来安排工作,将输入流(通过迭代器)连接到您的工作人员很容易。 这不是必需的(除非任务真的很快)但是,如果由于某种原因你需要分块获取数据,你可以直接这样做。

原因是您没有处理流式数据,您可以更好地控制任务的执行方式。

例如:

import lombok.SneakyThrows;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;


public class SharedJobPool {

    @SneakyThrows
    public static void main(String... args) {

        int WORKERS = 5;
        int CHUNK = 5;

        Iterator<Integer> jobs = IntStream.range(0, 47).iterator();

        List<Thread> workers = IntStream.range(0, WORKERS).mapToObj(j -> new Thread(() -> {
            while (true) {
                int[] chunk = new int[CHUNK];
                int size = 0;
                synchronized (jobs) {
                    while (size < CHUNK && jobs.hasNext())
                        chunk[size++] = jobs.next();
                }
                if (size == 0)
                    break;
                slowJobProccesor(j, chunk);
            }
        })).collect(Collectors.toList());

        for (Thread worker : workers)
            worker.start();

        for (Thread worker : workers)
            worker.join();

    }

    @SneakyThrows
    private static void slowJobProccesor(int j, int[] n) {
        Thread.sleep(ThreadLocalRandom.current().nextInt(1_000, 1_500));
        System.out.printf("    Thread #%d done job: %s%n", j, Arrays.stream(n).mapToObj(Integer::toString).collect(Collectors.joining(", ")));
    }
}

有输出

Thread #1 done job: 5, 6, 7, 8, 9
Thread #2 done job: 10, 11, 12, 13, 14
Thread #4 done job: 20, 21, 22, 23, 24
Thread #3 done job: 15, 16, 17, 18, 19
Thread #0 done job: 0, 1, 2, 3, 4
Thread #2 done job: 30, 31, 32, 33, 34
Thread #1 done job: 25, 26, 27, 28, 29
Thread #4 done job: 35, 36, 37, 38, 39
Thread #3 done job: 40, 41, 42, 43, 44
Thread #0 done job: 45, 46, 0, 0, 0