可在 ExecutorService 中使用共享数据调用
Callable in ExecutorService with shared data
我有一个场景,我以某种方式模拟如下:
我有一个在 4 个线程中执行的可调用任务列表。执行应该return一个值(aMap<String, String>
)。每个任务都有一个项目属性。
String[] itemList = {"WINDOWS", "MATRIX", "3D", "FACEBOOK", "HOTDOG", "SECRET"};
Random rand = new Random();
ArrayList<CallableTask> taskQueue = new ArrayList<>();
for(int i = 0; i<16; i++){
int x = rand.nextInt(6);
taskQueue.add(new CallableTask(itemList[x]));
}
如果某项任务超时(我在每个 Callable returns 的 Map<>
中有一个超时标志)或异常,那么我想取消或删除所有具有相同的项目属性或者我想通知线程以避免这些任务。
我正在使用 invokeAll() 执行。
ExecutorService executorService = Executors.newFixedThreadPool(numOfThreads);
List<Future<Map<String, String>>> executionResults = executorService.invokeAll(taskQueue);
下面是我的 Callable class 代码:
public class CallableTask implements Callable<Map<String, String>>{
private String item = "";
CallableTask(String input){
super();
item = input;
}
@Override
public Map<String, String> call() throws Exception {
Map<String, String> resultMap = new HashMap<>();
String[] sample = {"Y", "N", "N", "N", "Y"};
Random rand = new Random();
int x = 0;
try{
Thread.sleep(2000);
x = rand.nextInt(5);
resultMap.put("Thread", Thread.currentThread().getName());
int temp = 9/(x-1);
}
catch (Exception e){
throw new Exception("Divide by 0!!");
}
finally{
resultMap.put("Timeout", sample[x]);
resultMap.put("Item", item);
return resultMap;
}
//return resultMap;
}
}
我该如何实现?我想创建一个自定义 ThreadPoolExecutor 并尝试 afterExecute()
或 beforeExecute()
挂钩在任务之间进行干预,假设我可以在那里访问 Future<?>
,但 ThreadPoolExecutor 中只允许运行。另一方面,使用 Runnable 我无法获得结果。
创建任务时,与每个线程共享一个并发结构,以便它们可以协调:
Map<String, Boolean> abort = new ConcurrentHashMap<>();
...
taskQueue.add(new CallableTask(abort, itemList[x]));
然后,在每个任务中,在开始之前检查共享状态,并根据需要更新共享状态:
@Override
public Map<String, String> call() throws Exception {
if (abort.getOrDefault(item, false)) throw new ExecutionException();
...
try {
...
} catch (Exception ex) {
abort.put(item, true);
}
...
我有一个场景,我以某种方式模拟如下:
我有一个在 4 个线程中执行的可调用任务列表。执行应该return一个值(aMap<String, String>
)。每个任务都有一个项目属性。
String[] itemList = {"WINDOWS", "MATRIX", "3D", "FACEBOOK", "HOTDOG", "SECRET"};
Random rand = new Random();
ArrayList<CallableTask> taskQueue = new ArrayList<>();
for(int i = 0; i<16; i++){
int x = rand.nextInt(6);
taskQueue.add(new CallableTask(itemList[x]));
}
如果某项任务超时(我在每个 Callable returns 的 Map<>
中有一个超时标志)或异常,那么我想取消或删除所有具有相同的项目属性或者我想通知线程以避免这些任务。
我正在使用 invokeAll() 执行。
ExecutorService executorService = Executors.newFixedThreadPool(numOfThreads);
List<Future<Map<String, String>>> executionResults = executorService.invokeAll(taskQueue);
下面是我的 Callable class 代码:
public class CallableTask implements Callable<Map<String, String>>{
private String item = "";
CallableTask(String input){
super();
item = input;
}
@Override
public Map<String, String> call() throws Exception {
Map<String, String> resultMap = new HashMap<>();
String[] sample = {"Y", "N", "N", "N", "Y"};
Random rand = new Random();
int x = 0;
try{
Thread.sleep(2000);
x = rand.nextInt(5);
resultMap.put("Thread", Thread.currentThread().getName());
int temp = 9/(x-1);
}
catch (Exception e){
throw new Exception("Divide by 0!!");
}
finally{
resultMap.put("Timeout", sample[x]);
resultMap.put("Item", item);
return resultMap;
}
//return resultMap;
}
}
我该如何实现?我想创建一个自定义 ThreadPoolExecutor 并尝试 afterExecute()
或 beforeExecute()
挂钩在任务之间进行干预,假设我可以在那里访问 Future<?>
,但 ThreadPoolExecutor 中只允许运行。另一方面,使用 Runnable 我无法获得结果。
创建任务时,与每个线程共享一个并发结构,以便它们可以协调:
Map<String, Boolean> abort = new ConcurrentHashMap<>();
...
taskQueue.add(new CallableTask(abort, itemList[x]));
然后,在每个任务中,在开始之前检查共享状态,并根据需要更新共享状态:
@Override
public Map<String, String> call() throws Exception {
if (abort.getOrDefault(item, false)) throw new ExecutionException();
...
try {
...
} catch (Exception ex) {
abort.put(item, true);
}
...