如何在 for 循环中创建多个线程

How to create multiple threads inside of a for loop

嗨,我对 Java 有点陌生。

我有一个接受映射的方法,并将映射中的每个键值对写入文件。

我想在映射 运行ning 中为每个键值对创建一个线程,这样我就可以同时创建多个文件。不确定执行此操作的正确方法是什么或如何使用执行程序服务来完成此操作。

这是我正在尝试的一个非常简单的示例。在示例中,我只是使用 system.out.println 而不是编写用于编写文件的所有代码:

public class CityWriter
{

 public static void main(String []args)
 {
    LinkedHashMap<Integer, ArrayList<City>> stateNumCitiesMap = new LinkedHashMap<Integer, ArrayList<City>>();

    stateNumCitiesMap  = retrieveStateCitiesMap();

    int numOfThreadsToExecuteAtATime = 10;
    
    ExecutorService executor = Executors.newFixedThreadPool(numOfThreadsToExecuteAtATime);
    
    for(Integer key : stateNumCitiesMap.keySet()) //Could have up to 50 key,values in map
    {   
        executor.execute(writeCitiesOfStateToFile(key, StateNumCitiesMap.get(key)));
    }
    
    executor.shutdown();

 }

public LinkedHashMap<Integer, ArrayList<Cities>> writeCitiesOfStateToFile(int stateNum, List<City> citiesList) 
{
    for(City city : citiesList)
    {
        System.out.println(stateNum +" "+ city);
    }
}

}//end of class

我的问题是这里似乎不是并行执行线程。此外,我不想一次 运行 超过 10 个线程,即使 for 循环将调用执行程序 50 次。

请告诉我最有效的方法是什么。

实际上,如果我很好地理解了你的问题,你的代码就完全符合你的要求(当然,如果我们忽略了你代码片段中的所有语法错误):

  • 它不会产生超过 10 个线程,因为您已在此处指定 Executors.newFixedThreadPool(10) 您想要多少个线程
  • 您所有的 x 映射条目都将作为潜在工作分配给执行者。然后执行器将 运行 它们中的每一个与所有 10 个线程并行(但一次不超过 10 个作业)

您可以尝试此代码段并检查是否有多个线程正在并行执行该工作:

    public static void main(String[] args) {
        Map<Integer, List<String>> stateNumCitiesMap = new LinkedHashMap<>();

        for (int i = 0; i < 100; i++) {
            stateNumCitiesMap.put(i, Collections.singletonList("ABC"));
        }

        ExecutorService executor = Executors.newFixedThreadPool(10);

        for (Integer key : stateNumCitiesMap.keySet()) {
            executor.execute(() -> writeCitiesOfStateToFile(key, stateNumCitiesMap.get(key)));
        }

        executor.shutdown();
    }

    public static void writeCitiesOfStateToFile(int stateNum, List<String> citiesList) {
        for (String city : citiesList) {
            System.out.println(stateNum + " " + Thread.currentThread().getName());
        }
    }

如果你不想一个一个地交给执行者,你可以一次传递一批。

 public static void main(String[] args) throws InterruptedException {
        Map<Integer, List<String>> stateNumCitiesMap = new LinkedHashMap<>();

        for (int i = 0; i < 100; i++) {
            stateNumCitiesMap.put(i, Collections.singletonList("ABC"));
        }

        ExecutorService executor = Executors.newFixedThreadPool(10);

        List<Callable<Void>> jobs = new ArrayList<>();
        for (Integer key : stateNumCitiesMap.keySet()) {
            jobs.add(() -> {
                writeCitiesOfStateToFile(key, stateNumCitiesMap.get(key));
                return null;
            });
        }
        executor.invokeAll(jobs);

        executor.shutdown();
    }

    public static void writeCitiesOfStateToFile(int stateNum, List<String> citiesList) {
        for (String city : citiesList) {
            System.out.println(stateNum + " " + Thread.currentThread().getName());
        }
    }

在 java 中,您需要为您希望在线程中 运行 的任何对象提供 运行nable 接口,您没有这样做,这就是执行程序期待。

 executor.execute(() -> your function )

实际上是

 executor.execute(new Runnable() {
                @Override
                public void run() {
                    // your code 
                }
            });

该方法没有实现运行nables,只有在运行nable的运行方法中才会线程化

原因是执行器使用了一种观察者模式,你订阅了运行nable,然后执行器运行s 运行方法

来自 java 文档:

Runnable 接口应该由任何 class 实现,其实例旨在由线程执行。 class 必须定义一个名为 运行 的无参数方法。 此接口旨在为希望在活动时执行代码的对象提供通用协议。比如Runnable是由classThread实现的。处于活动状态仅表示线程已启动且尚未停止。

也可以使方法 return 成为 运行 自身可用

 public static Runnable writeCitiesOfStateToFile(params) {

        return  () -> System.out.println(params);
    }

您可以使用“invokeAll”方法进行多次执行,甚至可以得到它们的结果(完成与否)。即使他们是 50 个,它也会为他们使用 10 个线程。当所有任务完成时,将返回结果。像下面这样的,当成伪的吧。

Callable<int> callableTask = (fileName) -> {
// implement write to the file
        return 0;
    };
ExecutorService executor = Executors.newFixedThreadPool(10);
     
List<Callable<int>> tasksList;
for(City city : citiesList)
{
    tasksList.add(callableTask(city.toString()));
}

executor.invokeAll(tasksList);

Executor#execute可能是同步的

你说:

it doesn't seem like it is executing threads in parallel here

你没有解释这种看法的原因。

但是,仅供参考,情况可能确实如此。你叫 execute method on your ExecutorService.

    for(Integer key : stateNumCitiesMap.keySet()) //Could have up to 50 key,values in map
    {   
        executor.execute(writeCitiesOfStateToFile(key, StateNumCitiesMap.get(key)));
    }

execute method is inherited from the Executor接口,ExecutorService的超级接口。该接口及其方法被记录为 maybe 运行 异步执行您的任务。引用 Javadoc:

The command may execute in a new thread, in a pooled thread, or in the calling thread, at the discretion of the Executor implementation.

所以您可能确实看到了顺序非线程同步执行,而不是异步执行。

根据我对 ExecutorService 方法 submitinvokeAllinvokeAny 的阅读,这些方法似乎总是 运行 异步。

考虑到您选择的 ExecutorService 实施方式,我不相信会发生这种同步行为。您对 Executors.newFixedThreadPool 的调用会生成一个 ThreadPoolExecutor 类型的对象。 Looking briefly at the source code 那个具体的 class' execute 方法,它似乎总是异步工作(虽然我不完全确定)。

然而,似乎我们应该在使用Executor#execute.

时总是假定异步执行