在 Java 多线程中管理资源

Managing resource in Java multithreading

假设:

  1. 有 10 个任务
  2. 有 3 个线程
  3. 有 3 个数组列表
  4. 每个线程都与 1 个 arrayList 相关

我的问题是:如何管理资源?例如,第 3 步任务将使用 3 个线程和 3 个 arrayLists 执行。然后只要线程可用,第 4 个任务将与可用的 arrayList 一起执行。如何在 Java 中做到这一点?请给我参考或源代码。

public class NewClass {

    private Queue<Integer> availableCore = new LinkedList();

    public void manage() throws InterruptedException {
        ArrayList<ArrayList<Integer>> data = new ArrayList<>();
        data.add(new ArrayList<>());
        data.add(new ArrayList<>());
        data.add(new ArrayList<>());

        availableCore.add(0);
        availableCore.add(1);
        availableCore.add(2);

        ArrayList<Calculate> allThreads = new ArrayList<>();
        ExecutorService es = Executors.newWorkStealingPool(3);
        int threadCount = -1;

        int numOfTask = 10;

        while (numOfTask > 0) {
            if (!availableCore.isEmpty()) {
                System.out.println(availableCore.element());
                threadCount++;
                int core = availableCore.remove();
                allThreads.add(new Calculate(data.get(core), numOfTask, core));
                es.execute(allThreads.get(threadCount));
                numOfTask--;
            }
        }

        es.shutdown();
        boolean finished = es.awaitTermination(1, TimeUnit.DAYS);
        System.out.println("Done\n");

        for (int i = 0; i < data.size(); i++) {
            System.out.println(data.get(i));
        }
    }

    public class Calculate extends Thread {

        private ArrayList<Integer> data;
        private int start;
        private int core;

        public Calculate(ArrayList<Integer> data, int start, int core) {
            this.data = data;
            this.start = start;
            this.core = core;
        }

        @Override
        public void run() {
            this.data.add(start);
            availableCore.add(this.core);
        }

        public int getCore() {
            return core;
        }
    }
}

上面这段代码代表了我的实际问题。当我多次尝试 运行 时,有时会在“availableCore.element()”中出错。上面说availableCore是空的,但是我做了一个条件确保它不为空。

if (!availableCore.isEmpty()) {
            System.out.println(availableCore.element());

My question is : how to manage the resources? for example, first step 4 tasks will be executed using 4 core and 4 arrayList. Then whenever a core is available, the 5th task will be executed along with available arrayList. How to do that in Java? please give me reference or source code.

AFAIK,Java 本身没有将给定线程显式映射到给定核心的功能(即, 线程关联)。您可以通过显式系统调用来完成,例如在 Linux OS 中使用 taskset 命令。但这需要您做很多工作,并且可能会因此失去可移植性。

你最好使用更高级别的 Java 摘要(例如,ExecutorService)来为你处理那些低级别的细节。使用 ExecutorService 将任务发送到线程池让池和 最终 OS 线程调度程序处理 threads/task 之间的映射。

顺便说一句,即使使用 Java 线程池,也不能保证线程会很好地映射到单独的内核。

其他语言,例如 C/C++ 可能会为您提供允许您将核心显式映射到线程的库,例如使用 OpenMP thread affinity features.


根据您的评论:

If I only send the tasks to a Thread Pool without managing the arrayList, it will consume a lot of memory. Suppose 1 have 100 tasks then it will need 100 arrayLists.. I want to reduce the memory consumption by using only 4 arrayList to be reused whenever a task is completed.

我觉得您想实施 producer-consumer pattern。一个线程将负责将工作添加到队列(即 ArrayLists),其他线程会从队列中请求工作并使用它。

如前所述@dreamcrash我们无法将线程映射到核心。但是,您可以使用 ExecutorService 来完成您的任务。在这种情况下,您将为每个任务使用线程,您的代码将如下所示:

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Cores {

    private static ExecutorService service = Executors.newFixedThreadPool(4);

    public static void main(String[] args) {
        List<String> first = List.of("1", "2", "3");
        List<String> second = List.of("1", "2", "3");
        List<String> third = List.of("1", "2", "3");
        List<String> fourth = List.of("1", "2", "3");
        List<String> fifth = List.of("1", "2", "3");

        sendToExecutor(first, second, third, fourth, fifth);

        service.shutdown();
    }

    @SafeVarargs
    public static void sendToExecutor(List<String>... lists) {
        for (List<String> list : lists) {
            service.execute(() -> handleWholeList(list));
        }
    }

    public static void handleWholeList(List<String> strings) {
        strings.forEach(s -> System.out.printf("String %s handled in thread %s%n", s, Thread.currentThread().getName()));
    }
}

而不是硬编码 4 个线程:

private static ExecutorService service = Executors.newFixedThreadPool(4);

您可以使用可用处理器的数量:

private static ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());