改进 ExecutorService 以在超过 1 个 cpu 上执行进程
Improve ExecutorService to execute process on more then 1 cpu
当我运行下面的代码时,javaw.exe似乎只使用了一个核心。
int cores = Runtime.getRuntime().availableProcessors();
System.out.println("Number of cores: " + cores); //8 cores
int partitionSize = alphabet.length / cores;
ExecutorService service = Executors.newFixedThreadPool(cores);
List<Future> futures = new ArrayList<Future>();
for (int c = 0; c < cores; c++) {
char[] part = Arrays.copyOfRange(alphabet, c * partitionSize, (c + 1) * partitionSize);
futures.add(service.submit(new BruteWorker(part) ));
}
for(Future f : futures)
f.get();
根据这张来自 ProcessExplorer 的图片,我认为我的应用程序只使用 1 CPU 是否正确?如果是,我该怎么做才能使用我所有的 8 个核心?总 CPU 利用率为 12.5,对应于 100% / 8cores
这是我从一些 println 中得到的:
Number of cores: 8
New thread (id = 11)
New thread (id = 12)
New thread (id = 13)
New thread (id = 14)
New thread (id = 16)
New thread (id = 15)
New thread (id = 17)
New thread (id = 18)
10000000 tries on thread id = 12
20000000 tries on thread id = 12
30000000 tries on thread id = 12
40000000 tries on thread id = 12
50000000 tries on thread id = 12
60000000 tries on thread id = 12
为什么其他线程什么都不做?
BruteWorker class :
public class BruteWorker implements Runnable {
private static final char[] alphabet = "eaistnrulodmpcvqgbfjhzxykw0123456789!@#$%&*".toCharArray();
private static final int maxLength = 8;
private static Map<String, String> hashes;
private static MessageDigest md ;
private char[] partition;
private long nbTries = 0;
BruteWorker(char[] partition, Map<String, String> hashes) {
this.partition = partition;
this.hashes = hashes;
}
public void run() {
System.out.println("New thread (id = "+ Thread.currentThread().getId() +")");
try {
md = MessageDigest.getInstance("MD5");
for(char c : this.partition){
stringPossibilities(String.valueOf(c), maxLength);
}
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
}
System.out.println("End of thread (id = "+ Thread.currentThread().getId() +")");
}
//Recursive class
private void stringPossibilities(String prefix, int length) {
nbTries++;
if((nbTries % 10000000) == 0){
System.out.println(nbTries + " tries on thread id = "+ Thread.currentThread().getId());
}
md.update(prefix.getBytes());
byte[] bytes = md.digest();
String md5 = getMd5Hash(prefix);
if (hashes.containsKey(md5)){
System.out.println(prefix + " = " + md5);
}
if (prefix.length() < length){
for (int i = 0; i < alphabet.length; i++){
char c = alphabet[i];
stringPossibilities(prefix + c, length);
}
}
}
private String getMd5Hash(String prefix){
md.update(prefix.getBytes());
byte[] bytes = md.digest();
StringBuilder md5 = new StringBuilder();
for(int j =0; j < bytes.length; j++){
md5.append(Integer.toString((bytes[j] & 0xff) + 0x100, 16).substring(1));
}
return md5.toString();
}
}
正在努力查看这里到底发生了什么,如下所示,可以同时工作并且 运行s 就好了:
public class Main {
private static final char[] alphabet = "eaistnrulodmpcvqgbfjhzxykw0123456789!@#$%&*".toCharArray();
public static void main(String[] args) throws ExecutionException, InterruptedException {
int cores = Runtime.getRuntime().availableProcessors();
System.out.println("Number of cores: " + cores); //8 cores
int partitionSize = alphabet.length / cores;
ExecutorService service = Executors.newFixedThreadPool(cores);
List<Future> futures = new ArrayList<Future>();
for (int c = 0; c < cores; c++) {
char[] part = Arrays.copyOfRange(alphabet, c * partitionSize, (c + 1) * partitionSize);
futures.add(service.submit(new BruteWorker(part)));
}
for(Future f : futures)
f.get();
service.shutdown();
System.out.println("Completed normally");
}
public static class BruteWorker implements Runnable {
private char[] partition;
BruteWorker(char[] partition) {
this.partition = partition;
}
public void run() {
System.out.println("New thread (id = "+ Thread.currentThread().getId() +")");
for (long nbTries = 0; nbTries < 1_000_000_000L; nbTries ++ ) {
if((nbTries % 10_000_000) == 0){
System.out.println(nbTries + " tries on thread id = "+ Thread.currentThread().getId());
}
}
System.out.println("End of thread (id = "+ Thread.currentThread().getId() +")");
}
}
}
Also available (with output) as a gist.
请注意,这是对您的代码的砍伐。
我确实注意到您的代码确实使用了对 MessageDigest
和 Map<String,String>
的静态引用。有可能您的任务实际上在其他线程上失败了,而您只是没有发现这一点,因为您在等待其他长时间 运行ning 任务之一时受阻。
尝试在您的 运行 方法中捕获所有异常并转储堆栈跟踪,这可能会有所启发:
public void run() {
System.out.println("New thread (id = "+ Thread.currentThread().getId() +")");
try {
md = MessageDigest.getInstance("MD5");
for(char c : this.partition){
stringPossibilities(String.valueOf(c), maxLength);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("End of thread (id = "+ Thread.currentThread().getId() +")");
}
}
当我运行下面的代码时,javaw.exe似乎只使用了一个核心。
int cores = Runtime.getRuntime().availableProcessors();
System.out.println("Number of cores: " + cores); //8 cores
int partitionSize = alphabet.length / cores;
ExecutorService service = Executors.newFixedThreadPool(cores);
List<Future> futures = new ArrayList<Future>();
for (int c = 0; c < cores; c++) {
char[] part = Arrays.copyOfRange(alphabet, c * partitionSize, (c + 1) * partitionSize);
futures.add(service.submit(new BruteWorker(part) ));
}
for(Future f : futures)
f.get();
根据这张来自 ProcessExplorer 的图片,我认为我的应用程序只使用 1 CPU 是否正确?如果是,我该怎么做才能使用我所有的 8 个核心?总 CPU 利用率为 12.5,对应于 100% / 8cores
这是我从一些 println 中得到的:
Number of cores: 8
New thread (id = 11)
New thread (id = 12)
New thread (id = 13)
New thread (id = 14)
New thread (id = 16)
New thread (id = 15)
New thread (id = 17)
New thread (id = 18)
10000000 tries on thread id = 12
20000000 tries on thread id = 12
30000000 tries on thread id = 12
40000000 tries on thread id = 12
50000000 tries on thread id = 12
60000000 tries on thread id = 12
为什么其他线程什么都不做?
BruteWorker class :
public class BruteWorker implements Runnable {
private static final char[] alphabet = "eaistnrulodmpcvqgbfjhzxykw0123456789!@#$%&*".toCharArray();
private static final int maxLength = 8;
private static Map<String, String> hashes;
private static MessageDigest md ;
private char[] partition;
private long nbTries = 0;
BruteWorker(char[] partition, Map<String, String> hashes) {
this.partition = partition;
this.hashes = hashes;
}
public void run() {
System.out.println("New thread (id = "+ Thread.currentThread().getId() +")");
try {
md = MessageDigest.getInstance("MD5");
for(char c : this.partition){
stringPossibilities(String.valueOf(c), maxLength);
}
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
}
System.out.println("End of thread (id = "+ Thread.currentThread().getId() +")");
}
//Recursive class
private void stringPossibilities(String prefix, int length) {
nbTries++;
if((nbTries % 10000000) == 0){
System.out.println(nbTries + " tries on thread id = "+ Thread.currentThread().getId());
}
md.update(prefix.getBytes());
byte[] bytes = md.digest();
String md5 = getMd5Hash(prefix);
if (hashes.containsKey(md5)){
System.out.println(prefix + " = " + md5);
}
if (prefix.length() < length){
for (int i = 0; i < alphabet.length; i++){
char c = alphabet[i];
stringPossibilities(prefix + c, length);
}
}
}
private String getMd5Hash(String prefix){
md.update(prefix.getBytes());
byte[] bytes = md.digest();
StringBuilder md5 = new StringBuilder();
for(int j =0; j < bytes.length; j++){
md5.append(Integer.toString((bytes[j] & 0xff) + 0x100, 16).substring(1));
}
return md5.toString();
}
}
正在努力查看这里到底发生了什么,如下所示,可以同时工作并且 运行s 就好了:
public class Main {
private static final char[] alphabet = "eaistnrulodmpcvqgbfjhzxykw0123456789!@#$%&*".toCharArray();
public static void main(String[] args) throws ExecutionException, InterruptedException {
int cores = Runtime.getRuntime().availableProcessors();
System.out.println("Number of cores: " + cores); //8 cores
int partitionSize = alphabet.length / cores;
ExecutorService service = Executors.newFixedThreadPool(cores);
List<Future> futures = new ArrayList<Future>();
for (int c = 0; c < cores; c++) {
char[] part = Arrays.copyOfRange(alphabet, c * partitionSize, (c + 1) * partitionSize);
futures.add(service.submit(new BruteWorker(part)));
}
for(Future f : futures)
f.get();
service.shutdown();
System.out.println("Completed normally");
}
public static class BruteWorker implements Runnable {
private char[] partition;
BruteWorker(char[] partition) {
this.partition = partition;
}
public void run() {
System.out.println("New thread (id = "+ Thread.currentThread().getId() +")");
for (long nbTries = 0; nbTries < 1_000_000_000L; nbTries ++ ) {
if((nbTries % 10_000_000) == 0){
System.out.println(nbTries + " tries on thread id = "+ Thread.currentThread().getId());
}
}
System.out.println("End of thread (id = "+ Thread.currentThread().getId() +")");
}
}
}
Also available (with output) as a gist.
请注意,这是对您的代码的砍伐。
我确实注意到您的代码确实使用了对 MessageDigest
和 Map<String,String>
的静态引用。有可能您的任务实际上在其他线程上失败了,而您只是没有发现这一点,因为您在等待其他长时间 运行ning 任务之一时受阻。
尝试在您的 运行 方法中捕获所有异常并转储堆栈跟踪,这可能会有所启发:
public void run() {
System.out.println("New thread (id = "+ Thread.currentThread().getId() +")");
try {
md = MessageDigest.getInstance("MD5");
for(char c : this.partition){
stringPossibilities(String.valueOf(c), maxLength);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("End of thread (id = "+ Thread.currentThread().getId() +")");
}
}