如何对我的顺序 Java 代码进行多线程处理

How to multi-thread my sequential Java code

我有一个 Java 程序,它给定一个列表,对每个列表项执行一些 独立的 过程(包括从一些 HTTP 资源中检索文本并将它们插入到一个独立的 HashMap),最后在这些 HashMap 上计算一些数字。主要片段如下所示:

    for (int i = 0; i < mylist.size(); i++) {
        long startepoch = getTime(mylist.get(i).time);
        MyItem m = mylist.get(i);
        String index=(i+1)+"";

        process1(index, m.name, startepoch, m.duration);
        //adds to hashmap1

        if(m.name.equals("TEST")) {
            process2(index, m.name, startepoch, m.duration);
        //adds to hashmap2

        } else {
            process3(index, m.name, startepoch, m.duration);
        //adds to hashmap3
            process4(index, m.name, startepoch, m.duration);
        //adds to hashmap4
            process5(index, m.name, startepoch, m.duration);
        //adds to hashmap5
            process6(index, m.name, startepoch, m.duration);
        //adds to hashmap6
        }
    }

    // then start calculation on all hashmaps
    calculate_all();

由于目前此代码段是按顺序执行的,因此对于包含 500 个项目的列表,这可能需要 30 分钟左右的时间。我怎样才能对我的代码进行多线程处理以使其更快?并以线程安全的方式?

我尝试使用 ExecutorService executorService = Executors.newFixedThreadPool(10);,然后通过如下方式包装将每个进程提交给 executorService,但问题是我不知道他们什么时候完成,所以要调用 calculate_all()。所以我没有继续。

            executorService.submit(new Runnable() {
                public void run() {
                    process2(index, m.name, startepoch, m.duration);
                }
            });

有更好的工作思路吗?

but the problem was I couldn't know when they finish

当您向 Executor 提交内容时,您会返回一个包含结果(如果有)的 Future

然后您可以从您的主线程调用 Future::get 以等待这些结果(或者只是您的情况的完成)。

List<Future<?>> completions = executor.invokeAll(tasks);

// later, when you need to wait for completion
for(Future<?> c: completions) c.get();

您需要注意的另一件事是如何存储结果。如果您计划让您的任务将它们放入某个共享数据结构中,请确保该任务是线程安全的。从 Runnable 更改为 Callable 可能更容易,以便任务可以 return 结果(稍后您可以在主线程上以单线程方式合并)。

请注意,多线程并不一定能提高速度。多线程主要用于通过防止不必要的睡眠等来减少空闲 CPU 周期。

对于您提供的内容,我无能为力,但是,我认为您可以先做这样的事情:

  1. 使用线程安全的数据结构。这是必须的。如果你错过了这个 步骤,你的软件最终会崩溃。你会有一个 很难找到原因。 (例如,如果你有一个 ArrayList, 使用线程安全的)
  2. 您可以通过删除 for 循环开始尝试多线程 并为每次执行使用一个线程。如果你的 for 循环 大小超过你的线程数,你将拥有 将它们排入队列。
  3. 您有一个最终计算需要所有其他线程 结束。您可以使用 CountDownLatch、wait()/notifyAll() 或 synchronized() 取决于你的实现。
  4. 执行你的最终计算。

编辑

回应 (2):

您当前的执行是这样的:

for (int i = 0; i < mylist.size(); i++) {
    some_processes();
}

// then start calculation on all hashmaps
calculate_all();

现在,要删除 "for" 循环,您可以先从增加 "for" 循环开始。例如:

// Assuming mylist.size() is around 500 and you want, say 5, hardcoded multi-thrads
Thread_1:
for (int i = 0; i < 100; i++) {
    some_processes();
}
Thread_2:
for (int i = 100; i < 200; i++) {
    some_processes();
}
Thread_3:
for (int i = 200; i < 300; i++) {
    some_processes();
}
Thread_4:
for (int i = 300; i < 400; i++) {
    some_processes();
}
Thread_5:
for (int i = 400; i < mylist.size(); i++) {
    some_processes();
}
// Now you can use these threads as such:
CountDownLatch latch = new CountDownLatch(5);
ExecutorService executor = Executors.newFixedThreadPool(5);
executor.submit(new Thread1(latch));
executor.submit(new Thread2(latch));
executor.submit(new Thread3(latch));
executor.submit(new Thread4(latch));
executor.submit(new Thread5(latch));
try {
    latch.await();  // wait until latch counted down to 0
} catch (InterruptedException e) {
    e.printStackTrace();
}
// then start calculation on all hashmaps
calculate_all();

如您所见,此方法有几个缺点。例如,如果列表大小变为 380 怎么办?然后你有一个空闲线程。另外,如果你想要超过 5 个线程怎么办?

所以在这一点上,您可以通过让循环越来越少来进一步增加 "for" 循环的数量。最多,"for loop count" == "thread count",有效地删除了你的 for 循环。所以从技术上讲,您需要 "mylist.size()" 个线程。您可以这样实现:

// Allow a maximum amount of threads, say mylist.size(). I used LinkedBlockingDeque here because you might choose something lower than mylist.size().
BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>(mylist.size());
CountDownLatch latch = new CountDownLatch(mylist.size());

new Thread(new add_some_processes_w_single_loop_for_loop_to_queue(queue, latch)).start();
new Thread(new take_finished_processes_from_queue(queue)).start();
try {
    latch.await();  // wait until latch counted down to 0
} catch (InterruptedException e) {
    e.printStackTrace();
}
// then start calculation on all hashmaps
calculate_all();

请注意,通过这种安排,我们已经删除了您的初始 "for" 循环,而是创建了另一个循环,该循环仅在队列为空时提交新线程。您可以使用生产者和消费者应用程序检查 BlockingQueue 示例。例如参见:BlockingQueue examples

编辑 2

Future 的简单实现可能如下所示:

ExecutorService executorService = Executors.newCachedThreadPool();  
Future future1, future2, future3, future4, future5, future6;  

for (int i = 0; i < mylist.size(); i++) {
    long startepoch = getTime(mylist.get(i).time);
    MyItem m = mylist.get(i);
    String index=(i+1)+"";

    future1 = executorService.submit(new Callable() {...})
    //adds to hashmap1

    future1.get(); // Add this if you need to wait for process1 to finish before moving on to others. Also, add a try{}catch{} block as shown below.

    if(m.name.equals("TEST")) {
        future2 = executorService.submit(new Callable() {...})
    //adds to hashmap2

        future2.get(); // Add this if you need to wait for process2 to finish before moving on to others. Also, add a try{}catch{} block as shown below.

    } else {
        future3 = executorService.submit(new Callable() {...})
    //adds to hashmap3
        future4 = executorService.submit(new Callable() {...})
    //adds to hashmap4
        future5 = executorService.submit(new Callable() {...})
    //adds to hashmap5
        future6 = executorService.submit(new Callable() {...})
    //adds to hashmap6

         // Add extra future.get here as above...
    }
}

// then start calculation on all hashmaps
calculate_all();

不要忘记添加 try-catch 块,否则您可能无法从异常中恢复并崩溃。

// Example try-catch block surrounding a Future.get().
try {
    Object result = future.get();       
} catch (ExecutionException e) {
    //Do something
} catch (InterruptedException e) {
    //Do something
}

但是,您可以有一个更复杂的,如图 here。 link 也解释了 Thilo 的回答。