为什么 future.get() 会卡住?
Why future.get() get stuck?
我正在尝试使用 invokeAll()
方法将文件从客户端并行上传到不同的服务器。这是代码:
List<UploadTask> uploadTasks = new ArrayList<>();
ExecutorService taskExecutor = Executors.newFixedThreadPool(result.size());
for(ClusterElement clusterElement : result)
uploadTasks.add(new UploadTask(localFile, fileName, clusterElement));
//wait at most 10 seconds for the uploads of the file on all the servers
List<Future<ClusterElement>> uploadResult = taskExecutor.invokeAll(uploadTasks, 13, TimeUnit.MINUTES);
System.out.println("Timeout!");
System.out.println("Printing size " + uploadResult.size());
//after timeout abort all FTPClient (if already finish nothing happens)
for(int i=0;i<uploadResult.size();i++)
{
Future<ClusterElement> future = uploadResult.get(i);
ClusterElement clusterElement = result.get(i);
System.out.println("Result of "+clusterElement.getId()+" cancelled="+future.isCancelled());
try {
if(future.isCancelled()) {//if task not finished, then future cancelled (interrupted)
System.out.println("Deleting file on "+clusterElement.getId());
removeFile(clusterElement.getAddress(),clusterElement.getPort(),fileName,clusterElement);
}
else {
System.out.println("Getting result...");
ClusterElement serverUploaded = future.get();//task finished: uploaded complete (or failed)
if(serverUploaded!=null)//successfully uploaded
{
System.out.println("Successfully uploaded on "+serverUploaded.getId());
servers.add(serverUploaded);
}
else
System.out.println("Uploading failed on "+servers.get(i));
}
}catch (ExecutionException e) {
e.printStackTrace();
}
}
其中 UploadTask.call()
是:
@Override
public ClusterElement call() throws Exception {
ClusterElement result;//null = uploading failed, server where the file was uploaded otherwise
try (FileInputStream file = new FileInputStream(localFile)) {
FTPClient ftpClient = SingleClientRequest.createClient(clusterElement.getAddress(),clusterElement.getPort());
ftpClient.enterLocalPassiveMode();
System.out.println("Task uploading file on "+clusterElement.getId());
if(ftpClient.storeFile(fileName+".tmp", file)) {//so the Garbage Collector will not delete it during upload
ftpClient.rename(fileName + ".tmp", fileName);//rename the file with its real name
result = clusterElement;
}
else {
System.out.println("File " + fileName + " not uploaded on" + clusterElement.getId());
result = null;
}
} catch (IOException e) {
System.err.println("Error uploading "+localFile+" on "+clusterElement.getId());
result = null;
}
System.out.println("Returning "+result);
return result;
}
换句话说,如果文件没有在超时时间内上传,那么它就会从FS中删除(不幸的是FTPClient没有为此目的的任何中止操作,所以我必须在它还在上传时删除它) .
现在的问题是,如果我杀死其中一台服务器,UploadTask.call()
returns null
(因为 IOException
被抛出)所以未来是已完成,因此未取消...但是当我尝试 ClusterElement serverUploaded = future.get();
时,它卡在那里而不是 null
!
为什么会这样?
我自己发现问题是由于:
System.out.println("Uploading failed on "+servers.get(i));
因为 servers
是一个目前为空的列表,进程在那里阻塞
我正在尝试使用 invokeAll()
方法将文件从客户端并行上传到不同的服务器。这是代码:
List<UploadTask> uploadTasks = new ArrayList<>();
ExecutorService taskExecutor = Executors.newFixedThreadPool(result.size());
for(ClusterElement clusterElement : result)
uploadTasks.add(new UploadTask(localFile, fileName, clusterElement));
//wait at most 10 seconds for the uploads of the file on all the servers
List<Future<ClusterElement>> uploadResult = taskExecutor.invokeAll(uploadTasks, 13, TimeUnit.MINUTES);
System.out.println("Timeout!");
System.out.println("Printing size " + uploadResult.size());
//after timeout abort all FTPClient (if already finish nothing happens)
for(int i=0;i<uploadResult.size();i++)
{
Future<ClusterElement> future = uploadResult.get(i);
ClusterElement clusterElement = result.get(i);
System.out.println("Result of "+clusterElement.getId()+" cancelled="+future.isCancelled());
try {
if(future.isCancelled()) {//if task not finished, then future cancelled (interrupted)
System.out.println("Deleting file on "+clusterElement.getId());
removeFile(clusterElement.getAddress(),clusterElement.getPort(),fileName,clusterElement);
}
else {
System.out.println("Getting result...");
ClusterElement serverUploaded = future.get();//task finished: uploaded complete (or failed)
if(serverUploaded!=null)//successfully uploaded
{
System.out.println("Successfully uploaded on "+serverUploaded.getId());
servers.add(serverUploaded);
}
else
System.out.println("Uploading failed on "+servers.get(i));
}
}catch (ExecutionException e) {
e.printStackTrace();
}
}
其中 UploadTask.call()
是:
@Override
public ClusterElement call() throws Exception {
ClusterElement result;//null = uploading failed, server where the file was uploaded otherwise
try (FileInputStream file = new FileInputStream(localFile)) {
FTPClient ftpClient = SingleClientRequest.createClient(clusterElement.getAddress(),clusterElement.getPort());
ftpClient.enterLocalPassiveMode();
System.out.println("Task uploading file on "+clusterElement.getId());
if(ftpClient.storeFile(fileName+".tmp", file)) {//so the Garbage Collector will not delete it during upload
ftpClient.rename(fileName + ".tmp", fileName);//rename the file with its real name
result = clusterElement;
}
else {
System.out.println("File " + fileName + " not uploaded on" + clusterElement.getId());
result = null;
}
} catch (IOException e) {
System.err.println("Error uploading "+localFile+" on "+clusterElement.getId());
result = null;
}
System.out.println("Returning "+result);
return result;
}
换句话说,如果文件没有在超时时间内上传,那么它就会从FS中删除(不幸的是FTPClient没有为此目的的任何中止操作,所以我必须在它还在上传时删除它) .
现在的问题是,如果我杀死其中一台服务器,UploadTask.call()
returns null
(因为 IOException
被抛出)所以未来是已完成,因此未取消...但是当我尝试 ClusterElement serverUploaded = future.get();
时,它卡在那里而不是 null
!
为什么会这样?
我自己发现问题是由于:
System.out.println("Uploading failed on "+servers.get(i));
因为 servers
是一个目前为空的列表,进程在那里阻塞