Java ExecutorService invokeAll 多任务解析

Java ExecutorService invokeAll multiple task resolv

我正在使用执行器服务并行执行 运行 任务。并行 运行ning 方法采用输入整数和 returns 整数。由于并行任务具有 return 类型,因此我使用了 Callable anonymous class 。您可以在下面的示例中看到 ExecutorServiceExample task(int i ) 是从 executer 调用的。任务方法也有 1 秒的等待时间并抛出 i==7;

的异常

在下面的实现中,我使用 invokeAll 并使用 isDone 并开始尝试收集数据。

下面的程序抛出 IllegalMonitorStateException .

Future 任务迭代和检查 isDone 和 get() 有什么问题。如何处理特定调用的异常。我想 运行 所有 1 到 14 任务并行并全部完成收集 return return 类型。另外,如果出现错误,应该知道它抛出异常的输入,例如(7 和 14)

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;

class MyException extends Exception{
    MyException(String message) {
        super(message);
    }
}

public class ExecutorServiceExample {

    public int task(int i) throws MyException, InterruptedException {
        System.out.println("Running task.."+i);
        wait(1000);
        if(i%7==0) {
            throw new MyException("multiple of 7 not allowed");
        }

        return i;
    }

    public static void main(String[] args) {

        ExecutorService executorService = Executors.newFixedThreadPool(10);

        List<Callable<Integer>> tasks = Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12,13,14).stream().map(id->{
           return new Callable<Integer>() {
               @Override
               public Integer call() throws Exception {
                   ExecutorServiceExample executorServiceExample = new ExecutorServiceExample();
                   return executorServiceExample.task(id);
               }
           };
        }).collect(Collectors.toList());
        
        try{
            List<Future<Integer>> results = executorService.invokeAll(tasks);

            for (Future<Integer> task: results) {
                if(task.isDone()){
                    System.out.println(task.get());
                }
            }

        }catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }finally {
            executorService.shutdown();
        }
    }
}

事实上,每个任务都会产生一个 IllegalMonitorStateException,因为您没有在 synchronized 块中调用 wait 方法:IllegalMonitorStateException on wait() call。也许你应该使用 sleep 而不是 wait.

ExecutionExceptionfuture#get 抛出。所以如果缩小try-catch的范围,实际上会捕获到14个异常:

           for (Future<Integer> task: results) {
                try {
                    System.out.println(task.get());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }

不知道你为什么要这样设计。但显然,它有很多问题。 i/7==0 or i % 7 ==0examples 不是锁,为什么要使用 wait? returns of 'invokeAll' 的 futures 必须完成,但在调用 get 时可能会出现异常。 这是你想要的吗?

import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

class MyException extends Exception {
    MyException(String message) {
        super(message);
    }
}

public class ExecutorServiceExample {

    public int task(int i) throws MyException, InterruptedException {
        TimeUnit.MILLISECONDS.sleep(1000);
        if (i % 7 == 0) {
            throw new MyException("multiple of 7 not allowed");
        }

        return i;
    }

    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newFixedThreadPool(10);

        List<Callable<Integer>> tasks = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14)
                .map(id -> (Callable<Integer>) () -> {
                    ExecutorServiceExample executorServiceExample = new ExecutorServiceExample();
                    return executorServiceExample.task(id);
                }).collect(Collectors.toList());
        List<Future<Integer>> results = executorService.invokeAll(tasks);
        executorService.shutdown();
        for (Future<Integer> task : results) {
            try {
                System.out.println(task.get());
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }


    }
}