改进 ExecutorService 以在超过 1 个 cpu 上执行进程

Improve ExecutorService to execute process on more then 1 cpu

当我运行下面的代码时,javaw.exe似乎只使用了一个核心。

int cores = Runtime.getRuntime().availableProcessors();
System.out.println("Number of cores: " + cores); //8 cores

int partitionSize = alphabet.length / cores;
ExecutorService service = Executors.newFixedThreadPool(cores);
List<Future> futures = new ArrayList<Future>();

for (int c = 0; c < cores; c++) {

    char[] part = Arrays.copyOfRange(alphabet, c * partitionSize, (c + 1) * partitionSize);
    futures.add(service.submit(new BruteWorker(part) ));

}

for(Future f : futures)
    f.get();

根据这张来自 ProcessExplorer 的图片,我认为我的应用程序只使用 1 CPU 是否正确?如果是,我该怎么做才能使用我所有的 8 个核心?总 CPU 利用率为 12.5,对应于 100% / 8cores

这是我从一些 println 中得到的:

Number of cores: 8
New thread (id = 11)
New thread (id = 12)
New thread (id = 13)
New thread (id = 14)
New thread (id = 16)
New thread (id = 15)
New thread (id = 17)
New thread (id = 18)
10000000 tries on thread id = 12
20000000 tries on thread id = 12
30000000 tries on thread id = 12
40000000 tries on thread id = 12
50000000 tries on thread id = 12
60000000 tries on thread id = 12

为什么其他线程什么都不做?

BruteWorker class :

public class BruteWorker implements Runnable {
    private static final char[] alphabet = "eaistnrulodmpcvqgbfjhzxykw0123456789!@#$%&*".toCharArray();
    private static final int maxLength = 8;
    private static Map<String, String> hashes;
    private static MessageDigest md ;
    private char[] partition;
    private long nbTries = 0;

    BruteWorker(char[] partition, Map<String, String> hashes) {
        this.partition = partition;
        this.hashes = hashes;
    }

    public void run() {
        System.out.println("New thread (id = "+ Thread.currentThread().getId() +")");
        try {
            md = MessageDigest.getInstance("MD5");
            for(char c : this.partition){
                stringPossibilities(String.valueOf(c), maxLength);
            }
        } catch (NoSuchAlgorithmException e) {
            e.printStackTrace();
        }
        System.out.println("End of thread (id = "+ Thread.currentThread().getId() +")");
    }

    //Recursive class
    private void stringPossibilities(String prefix, int length) {
        nbTries++;
        if((nbTries % 10000000) == 0){
            System.out.println(nbTries + " tries on thread id = "+ Thread.currentThread().getId());
        }
        md.update(prefix.getBytes());
        byte[] bytes = md.digest();
        String md5 = getMd5Hash(prefix);

        if (hashes.containsKey(md5)){
            System.out.println(prefix + " = " + md5);
        }


        if (prefix.length() < length){
            for (int i = 0; i < alphabet.length; i++){
                char c = alphabet[i];
                stringPossibilities(prefix + c, length);
            }
        }
    }

    private String getMd5Hash(String prefix){
        md.update(prefix.getBytes());
        byte[] bytes = md.digest();
        StringBuilder md5 = new StringBuilder();
        for(int j =0; j < bytes.length; j++){
            md5.append(Integer.toString((bytes[j] & 0xff) + 0x100, 16).substring(1));
        }
        return md5.toString();
    }

}

正在努力查看这里到底发生了什么,如下所示,可以同时工作并且 运行s 就好了:

public class Main {
    private static final char[] alphabet = "eaistnrulodmpcvqgbfjhzxykw0123456789!@#$%&*".toCharArray();
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        int cores = Runtime.getRuntime().availableProcessors();
        System.out.println("Number of cores: " + cores); //8 cores

        int partitionSize = alphabet.length / cores;
        ExecutorService service = Executors.newFixedThreadPool(cores);
        List<Future> futures = new ArrayList<Future>();

        for (int c = 0; c < cores; c++) {

            char[] part = Arrays.copyOfRange(alphabet, c * partitionSize, (c + 1) * partitionSize);
            futures.add(service.submit(new BruteWorker(part)));

        }

        for(Future f : futures)
            f.get();

        service.shutdown();
        System.out.println("Completed normally");
    }
    public static class BruteWorker implements Runnable {
        private char[] partition;


        BruteWorker(char[] partition) {
            this.partition = partition;
        }

        public void run() {
            System.out.println("New thread (id = "+ Thread.currentThread().getId() +")");
            for (long nbTries = 0; nbTries < 1_000_000_000L; nbTries ++ ) {
                if((nbTries % 10_000_000) == 0){
                    System.out.println(nbTries + " tries on thread id = "+ Thread.currentThread().getId());
                }
            }
            System.out.println("End of thread (id = "+ Thread.currentThread().getId() +")");
        }
    }    
}

Also available (with output) as a gist.

请注意,这是对您的代码的砍伐。

我确实注意到您的代码确实使用了对 MessageDigestMap<String,String> 的静态引用。有可能您的任务实际上在其他线程上失败了,而您只是没有发现这一点,因为您在等待其他长时间 运行ning 任务之一时受阻。

尝试在您的 运行 方法中捕获所有异常并转储堆栈跟踪,这可能会有所启发:

public void run() {
    System.out.println("New thread (id = "+ Thread.currentThread().getId() +")");
    try {
        md = MessageDigest.getInstance("MD5");
        for(char c : this.partition){
            stringPossibilities(String.valueOf(c), maxLength);
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        System.out.println("End of thread (id = "+ Thread.currentThread().getId() +")");
    }
}