用于并行化方法的执行器任务
Executor tasks for parallelizing a method
所以我试图从磁盘中删除 listDir 中列出的 n 个文件,因此我将 listDir 分成 4 个部分,并让它从磁盘中并行删除。这里的目的是并行进行,以使其快速而不是顺序进行。 deleteObject(x,credential,token) 可以假设为 API 最终从磁盘中删除一个对象并且是一个原子操作。成功删除时 return 为真,否则为假
我这里没什么问题
- 因为我有 4 个并行方法,我正在通过 invokeAll 执行
Executors.newFixedThreadPool(4) 声明了 4 个线程
是否总是将 1 个线程分配给 1 个方法?
- 是否需要在 parallelDeleteOperation() 方法的 for 循环中同步并使用 volatile for 迭代器 'i'。我问这个的原因是假设第一个线程没有完成它的任务(删除 listDir1 并且 for 循环没有完成)并且假设在中途它进行了上下文切换并且第二个线程开始执行相同的任务(删除 listDir1)。只是想知道在这种情况下第二个线程是否可以获得 IndexOutOfBound 异常。
- 将列表分成 4 个部分并执行它比让多个线程对一个非常大的列表执行删除操作有什么好处吗?
如果 ExecutorService 的操作之一 returns false 那么整个 deleteMain() API 将 return false
private boolean deleteMain(String parent, List<Structure>
listDir, String place, String node, Sequence<String>
groups, String Uid) throws IOException {
int noCores = Runtime.getRuntime().availableProcessors();
List<List<Integer>> splittedList = splitList(listDir, noCores);
System.out.println(splittedList.size());
System.out.println("NoOfCores" + noCores);
Set<Callable<Boolean>> callables = new HashSet<Callable<Boolean>>();
for (int i = 0; i < splittedList.size(); i++) {
List<Integer> l = splittedList.get(i);
callables.add(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return parallelDeleteOperation(parent, listDir, place, node, groups Uid);
}
});
}
ExecutorService service = Executors.newFixedThreadPool(noCores);
try {
List<Future<Boolean>> futures = service.invokeAll(callables);
for (Future<Boolean> future : futures) {
if (future.get() != true)
return future.get();
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
service.shutdown();
return true;
}
private Boolean parallelDeleteOperation(String parent, List<Structure>
listDir, String place, String node, Sequence<String>
groups, String Uid) throws IOException {
for (int i = 0; i < listDir.size(); i++) {
final String name = listDir.get(i).filename;
final String filePath = "/" + (parent.isEmpty() ? "" : (parent +
"/")) + name;
final DeleteMessage message = new DeleteMessage(name, place, node
filePath);
final boolean Status = delete(message, groups, Uid, place);
if (Status != true)
return Status;
}
return true;
}
- So as I have 4 parallel method which I am executing though invokeAll and there are 4 threads declared by Executors.newFixedThreadPool(4) would there always be 1 thread be assigned to 1 method?
通常情况下任务数量等于池中的线程数是正确的,但不能保证。这在很大程度上取决于池中的任务。
Executors.newFixedThreadPool(4).invokeAll(IntStream.range(0, 8).mapToObj(i -> (Callable<Integer>) () -> {
System.out.println(Thread.currentThread().getName() + ": " + i);
return 0;
}).collect(Collectors.toList()));
如果池中还有更多任务,如上,输出可以如下。没有明显的可预测规则来处理任务:
pool-1-thread-1: 0
pool-1-thread-2: 1
pool-1-thread-3: 2
pool-1-thread-1: 4
pool-1-thread-1: 5
pool-1-thread-1: 6
pool-1-thread-1: 7
pool-1-thread-4: 3
- Do I need to synchronize and use volatile for iterator 'i' in the for loop of parallelDeleteOperation() method.
不,你不需要。您已经将原始列表拆分为单独的 四个列表。
在您的代码中:
final List listDir1 = listDir.subList(0, listDir.size() / 4);
关于你的第三个问题:
- Is there any advantage of dividing list in 4 parts and executing this rather than having multiple threads executing delete operation on a very big list.
你最好在现实条件下进行一些测试。这太复杂了,不能只说它好还是不好。
当您在一个大列表上删除时,竞争条件可能比事先拆分列表更严重,这可能会产生额外的开销。
此外,即使没有任何并行性,它的性能也不会差。当涉及到 多用户 系统时,由于线程上下文切换开销,并行版本可能比顺序版本更差。
你要测试它们,测试助手代码可以是:
Long start = 0L;
List<Long> list = new ArrayList<>();
for (int i = 0; i < 1_000; ++i) {
start = System.nanoTime();
// your method to be tested;
list.add(System.nanoTime() - start);
}
System.out.println("Time cost summary: " + list.stream().collect(Collectors.summarizingLong(Long::valueOf)));
- If one of the operation of ExecutorService returns false then on the whole deleteMain() API would return false
我想重构您的代码,使其更清晰,同时满足您最后的要求(第 4 条):
// using the core as the count;
// since your task is CPU-bound, we can directly use parallelStream;
private static void testThreadPool(List<Integer> listDir) {
// if one of the tasks failed, you got isFailed == true;
boolean isFailed = splitList(listDir, Runtime.getRuntime().availableProcessors()).stream()
.parallel().map(YourClass::parallelDeleteOperation).anyMatch(ret -> ret == false); // if any is false, it gives you false
}
// split up the list into "count" lists;
private static <T> List<List<T>> splitList(List<T> list, int count) {
List<List<T>> listList = new ArrayList<>();
for (int i = 0, blockSize = list.size() / count; i < count; ++i) {
listList.add(list.subList(i * blockSize, Math.min((i+1) * blockSize, list.size()));
}
return listList;
}
我看不出将列表分成 4 个子列表有什么好处:一旦线程终止其列表,它就会空闲。如果为输入列表中的每个元素提交一个任务,则所有四个线程都将处于活动状态,直到队列为空。
更新:
正如有人指出的那样,您可以使用 parallelStream
,它更简单而且可能更快;但是如果你想保留 ExecutorService
,你可以这样做:
int noCores = Runtime.getRuntime().availableProcessors();
List<Future<Boolean>> futures = new ArrayList<>();
ExecutorService service = Executors.newFixedThreadPool(noCores);
try {
for (Structure s: listDir) {
String name = s.filename;
String filePath = "/" + (parent.isEmpty() ? "" : (parent
+ "/")) + name;
Future<Boolean> result = service.submit(()-> {
final DeleteMessage message = new DeleteMessage(
name, place, node, filePath);
return delete(message, groups, Uid, place);
});
futures.add(result);
}
} finally {
service.shutdown();
}
所以我试图从磁盘中删除 listDir 中列出的 n 个文件,因此我将 listDir 分成 4 个部分,并让它从磁盘中并行删除。这里的目的是并行进行,以使其快速而不是顺序进行。 deleteObject(x,credential,token) 可以假设为 API 最终从磁盘中删除一个对象并且是一个原子操作。成功删除时 return 为真,否则为假
我这里没什么问题
- 因为我有 4 个并行方法,我正在通过 invokeAll 执行 Executors.newFixedThreadPool(4) 声明了 4 个线程 是否总是将 1 个线程分配给 1 个方法?
- 是否需要在 parallelDeleteOperation() 方法的 for 循环中同步并使用 volatile for 迭代器 'i'。我问这个的原因是假设第一个线程没有完成它的任务(删除 listDir1 并且 for 循环没有完成)并且假设在中途它进行了上下文切换并且第二个线程开始执行相同的任务(删除 listDir1)。只是想知道在这种情况下第二个线程是否可以获得 IndexOutOfBound 异常。
- 将列表分成 4 个部分并执行它比让多个线程对一个非常大的列表执行删除操作有什么好处吗?
如果 ExecutorService 的操作之一 returns false 那么整个 deleteMain() API 将 return false
private boolean deleteMain(String parent, List<Structure> listDir, String place, String node, Sequence<String> groups, String Uid) throws IOException { int noCores = Runtime.getRuntime().availableProcessors(); List<List<Integer>> splittedList = splitList(listDir, noCores); System.out.println(splittedList.size()); System.out.println("NoOfCores" + noCores); Set<Callable<Boolean>> callables = new HashSet<Callable<Boolean>>(); for (int i = 0; i < splittedList.size(); i++) { List<Integer> l = splittedList.get(i); callables.add(new Callable<Boolean>() { @Override public Boolean call() throws Exception { return parallelDeleteOperation(parent, listDir, place, node, groups Uid); } }); } ExecutorService service = Executors.newFixedThreadPool(noCores); try { List<Future<Boolean>> futures = service.invokeAll(callables); for (Future<Boolean> future : futures) { if (future.get() != true) return future.get(); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } service.shutdown(); return true; } private Boolean parallelDeleteOperation(String parent, List<Structure> listDir, String place, String node, Sequence<String> groups, String Uid) throws IOException { for (int i = 0; i < listDir.size(); i++) { final String name = listDir.get(i).filename; final String filePath = "/" + (parent.isEmpty() ? "" : (parent + "/")) + name; final DeleteMessage message = new DeleteMessage(name, place, node filePath); final boolean Status = delete(message, groups, Uid, place); if (Status != true) return Status; } return true; }
- So as I have 4 parallel method which I am executing though invokeAll and there are 4 threads declared by Executors.newFixedThreadPool(4) would there always be 1 thread be assigned to 1 method?
通常情况下任务数量等于池中的线程数是正确的,但不能保证。这在很大程度上取决于池中的任务。
Executors.newFixedThreadPool(4).invokeAll(IntStream.range(0, 8).mapToObj(i -> (Callable<Integer>) () -> {
System.out.println(Thread.currentThread().getName() + ": " + i);
return 0;
}).collect(Collectors.toList()));
如果池中还有更多任务,如上,输出可以如下。没有明显的可预测规则来处理任务:
pool-1-thread-1: 0
pool-1-thread-2: 1
pool-1-thread-3: 2
pool-1-thread-1: 4
pool-1-thread-1: 5
pool-1-thread-1: 6
pool-1-thread-1: 7
pool-1-thread-4: 3
- Do I need to synchronize and use volatile for iterator 'i' in the for loop of parallelDeleteOperation() method.
不,你不需要。您已经将原始列表拆分为单独的 四个列表。
在您的代码中:
final List listDir1 = listDir.subList(0, listDir.size() / 4);
关于你的第三个问题:
- Is there any advantage of dividing list in 4 parts and executing this rather than having multiple threads executing delete operation on a very big list.
你最好在现实条件下进行一些测试。这太复杂了,不能只说它好还是不好。
当您在一个大列表上删除时,竞争条件可能比事先拆分列表更严重,这可能会产生额外的开销。
此外,即使没有任何并行性,它的性能也不会差。当涉及到 多用户 系统时,由于线程上下文切换开销,并行版本可能比顺序版本更差。
你要测试它们,测试助手代码可以是:
Long start = 0L;
List<Long> list = new ArrayList<>();
for (int i = 0; i < 1_000; ++i) {
start = System.nanoTime();
// your method to be tested;
list.add(System.nanoTime() - start);
}
System.out.println("Time cost summary: " + list.stream().collect(Collectors.summarizingLong(Long::valueOf)));
- If one of the operation of ExecutorService returns false then on the whole deleteMain() API would return false
我想重构您的代码,使其更清晰,同时满足您最后的要求(第 4 条):
// using the core as the count;
// since your task is CPU-bound, we can directly use parallelStream;
private static void testThreadPool(List<Integer> listDir) {
// if one of the tasks failed, you got isFailed == true;
boolean isFailed = splitList(listDir, Runtime.getRuntime().availableProcessors()).stream()
.parallel().map(YourClass::parallelDeleteOperation).anyMatch(ret -> ret == false); // if any is false, it gives you false
}
// split up the list into "count" lists;
private static <T> List<List<T>> splitList(List<T> list, int count) {
List<List<T>> listList = new ArrayList<>();
for (int i = 0, blockSize = list.size() / count; i < count; ++i) {
listList.add(list.subList(i * blockSize, Math.min((i+1) * blockSize, list.size()));
}
return listList;
}
我看不出将列表分成 4 个子列表有什么好处:一旦线程终止其列表,它就会空闲。如果为输入列表中的每个元素提交一个任务,则所有四个线程都将处于活动状态,直到队列为空。
更新:
正如有人指出的那样,您可以使用 parallelStream
,它更简单而且可能更快;但是如果你想保留 ExecutorService
,你可以这样做:
int noCores = Runtime.getRuntime().availableProcessors();
List<Future<Boolean>> futures = new ArrayList<>();
ExecutorService service = Executors.newFixedThreadPool(noCores);
try {
for (Structure s: listDir) {
String name = s.filename;
String filePath = "/" + (parent.isEmpty() ? "" : (parent
+ "/")) + name;
Future<Boolean> result = service.submit(()-> {
final DeleteMessage message = new DeleteMessage(
name, place, node, filePath);
return delete(message, groups, Uid, place);
});
futures.add(result);
}
} finally {
service.shutdown();
}