ExecutorService - 运行 并行任务并保存结果
ExecutorService - running tasks in parallel and save results
我想同时为最多 10 个用户发送 ping,并在 ping 完成后用结果更新用户对象。
为了做到这一点,我正在尝试使用 ExecutorService。
我从这样的代码开始:
private void pingUsers(List<User> userList) throws ExecutionException, InterruptedException {
final int NUM_THREADS = 10;
ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
for (User user : userList) {
SnmpPingDevice pingUser = new PingUser(user);
Future<Boolean> isUserActive = executor.submit(pingUser);
user.isActive = isUserActive.get() ; // -- I guess it will block other pings and i'm back to my starting point where I need to run the pings in parallel.
}
executor.shutdown();
try {
executor.awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.error("Failed to terminate executor");
}
}
这是我的 PingUser class 的样子:
@Override
public Boolean call() {
ping = new CmdRunner(toolDir,outDir,
new UserOidWorkerPing(version,community,ip,logger));
return this.isActive();
}
public boolean isActive(){
String cmd = ping.getCmdNoRedirect();
String rc = this.cmdRunner.runShellCmd(cmd,this.outDir +"/dummy",false);
logger.debug("PING was sent with cmd:" + cmd + ",rc:" + rc);
return rc != null && !rc.contains("Timeout:") && !rc.isEmpty();
}
回到同样的问题,ping 不会 运行 并行(一旦循环等待 isUserActive.get()
结束)
知道我错过了什么吗?我如何才能并行执行这些 ping 运行 并将每个用户的结果保存在我的 List<User> userList
?
中
您正在阻止每次调用的执行,使用以下行:
user.isActive = isUserActive.get() ;
这有效地等待 调用结束,并且每次调用都这样做。
您应该提交所有任务,并构建一个 Future
列表,以便仅在提交所有任务后等待结果。像这样:
List<Future<Boolean>> tasks = new ArrayList<>();
for (User user : userList) {
SnmpPingDevice pingUser = new PingUser(user);
tasks.add(executor.submit(pingUser));
}
for(Future<Boolean> task: tasks) {
//use the result... OK to get() here.
}
Future::get
是阻塞操作,因此调用线程将被阻塞,直到调用完成。所以你只有在上一个任务完成后才提交一个新任务。
考虑使用 ExecutorService::invokeAll
,这将 return 一个 Future
的列表:
List<PingUser> pingUsers = userList.stream().map(PingUser::new).collect(Collectors.toList());
List<Future<Boolean>> results = executor.invokeAll(pingUsers);
您可以将 user.isActive = future.get()
添加到您提交的 Runnable
中。
for (User user : userList) {
SnmpPingDevice pingUser = new PingUser(user);
executor.submit(() -> user.isActive = pingUser.call());
}
我想同时为最多 10 个用户发送 ping,并在 ping 完成后用结果更新用户对象。
为了做到这一点,我正在尝试使用 ExecutorService。
我从这样的代码开始:
private void pingUsers(List<User> userList) throws ExecutionException, InterruptedException {
final int NUM_THREADS = 10;
ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
for (User user : userList) {
SnmpPingDevice pingUser = new PingUser(user);
Future<Boolean> isUserActive = executor.submit(pingUser);
user.isActive = isUserActive.get() ; // -- I guess it will block other pings and i'm back to my starting point where I need to run the pings in parallel.
}
executor.shutdown();
try {
executor.awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.error("Failed to terminate executor");
}
}
这是我的 PingUser class 的样子:
@Override
public Boolean call() {
ping = new CmdRunner(toolDir,outDir,
new UserOidWorkerPing(version,community,ip,logger));
return this.isActive();
}
public boolean isActive(){
String cmd = ping.getCmdNoRedirect();
String rc = this.cmdRunner.runShellCmd(cmd,this.outDir +"/dummy",false);
logger.debug("PING was sent with cmd:" + cmd + ",rc:" + rc);
return rc != null && !rc.contains("Timeout:") && !rc.isEmpty();
}
回到同样的问题,ping 不会 运行 并行(一旦循环等待 isUserActive.get()
结束)
知道我错过了什么吗?我如何才能并行执行这些 ping 运行 并将每个用户的结果保存在我的 List<User> userList
?
您正在阻止每次调用的执行,使用以下行:
user.isActive = isUserActive.get() ;
这有效地等待 调用结束,并且每次调用都这样做。
您应该提交所有任务,并构建一个 Future
列表,以便仅在提交所有任务后等待结果。像这样:
List<Future<Boolean>> tasks = new ArrayList<>();
for (User user : userList) {
SnmpPingDevice pingUser = new PingUser(user);
tasks.add(executor.submit(pingUser));
}
for(Future<Boolean> task: tasks) {
//use the result... OK to get() here.
}
Future::get
是阻塞操作,因此调用线程将被阻塞,直到调用完成。所以你只有在上一个任务完成后才提交一个新任务。
考虑使用 ExecutorService::invokeAll
,这将 return 一个 Future
的列表:
List<PingUser> pingUsers = userList.stream().map(PingUser::new).collect(Collectors.toList());
List<Future<Boolean>> results = executor.invokeAll(pingUsers);
您可以将 user.isActive = future.get()
添加到您提交的 Runnable
中。
for (User user : userList) {
SnmpPingDevice pingUser = new PingUser(user);
executor.submit(() -> user.isActive = pingUser.call());
}