使用 ExecutorService 监视 Future<T> 个对象以完成
Watching Future<T> objects for completion using ExecutorService
我有一个程序,其中我使用提交给 ExecutorService 的 Callable 对象从 S3 下载文件。文件很大,需要几分钟才能完全下载。创建另一个 Callable class 从下载器中获取未来并观察它完成是否有意义?我的最终目标是将所有完整的下载添加到缓存中位于中心的列表。
例如:
public void add(final String s3urlToDownload){
Future<S3Object> futureS3obj = cachedPoolExecutor.submit(new S3Downloader(s3urlToDownload));
// Instead of calling futureS3obj.get() and waiting, submit the Future to the "Watcher" service.
// Within FutureWatcher, the S3Object will be added to the List once the download is complete.
cachedPoolExecutor.submit(new FutureWatcher(downloadedList, futureS3obj))
}
与其制作一个消耗资源的'watcher',不如让当前下载完成时通知主人。
这里有一些假对象,用于说明。 "Downloads" 只是随机睡眠:
// for illustration only
class S3Object {
String id;
}
// for illustration only
class S3Downloader {
public S3Object download(String url) {
int min = 2;
int max = 5;
Random rand = new Random();
int random = rand.nextInt((max - min) + 1) + min;
try { Thread.sleep(1000 * random); } catch (Exception ex) {}
S3Object result = new S3Object();
result.id = url;
return result;
}
}
我们可以定义一个任务来下载文件、更新(线程安全的)列表并递减 CountDownLatch
:
class MyTask implements Runnable {
private final List<S3Object> list;
private final CountDownLatch latch;
private final String url;
public MyTask(List<S3Object> list, CountDownLatch latch, String url) {
this.list = list;
this.latch = latch;
this.url = url;
}
public void run() {
S3Downloader downloader = new S3Downloader();
S3Object result = downloader.download(url);
list.add(result);
latch.countDown();
}
}
一个 Runner 示例说明了 "client"。 go
方法是驱动,使用add
方法(不阻塞):
public class Runner {
private ExecutorService pool = Executors.newCachedThreadPool();
private int numUrls = 20;
private CountDownLatch latch = new CountDownLatch(numUrls);
private List<S3Object> results = Collections.synchronizedList(new ArrayList<S3Object>());
public void add(String url) {
pool.submit(new MyTask(results, latch, url));
}
public void go() throws Exception {
for(int i = 0; i < numUrls; i++) {
String url = "http://example" + i;
add(url);
}
// wait for all downloads
latch.await();
for (S3Object result : results) {
System.out.println("result id: " + result.id);
}
}
}
生产代码必须处理错误并可能根据需要重新组织客户端。
我有一个程序,其中我使用提交给 ExecutorService 的 Callable 对象从 S3 下载文件。文件很大,需要几分钟才能完全下载。创建另一个 Callable class 从下载器中获取未来并观察它完成是否有意义?我的最终目标是将所有完整的下载添加到缓存中位于中心的列表。
例如:
public void add(final String s3urlToDownload){
Future<S3Object> futureS3obj = cachedPoolExecutor.submit(new S3Downloader(s3urlToDownload));
// Instead of calling futureS3obj.get() and waiting, submit the Future to the "Watcher" service.
// Within FutureWatcher, the S3Object will be added to the List once the download is complete.
cachedPoolExecutor.submit(new FutureWatcher(downloadedList, futureS3obj))
}
与其制作一个消耗资源的'watcher',不如让当前下载完成时通知主人。
这里有一些假对象,用于说明。 "Downloads" 只是随机睡眠:
// for illustration only
class S3Object {
String id;
}
// for illustration only
class S3Downloader {
public S3Object download(String url) {
int min = 2;
int max = 5;
Random rand = new Random();
int random = rand.nextInt((max - min) + 1) + min;
try { Thread.sleep(1000 * random); } catch (Exception ex) {}
S3Object result = new S3Object();
result.id = url;
return result;
}
}
我们可以定义一个任务来下载文件、更新(线程安全的)列表并递减 CountDownLatch
:
class MyTask implements Runnable {
private final List<S3Object> list;
private final CountDownLatch latch;
private final String url;
public MyTask(List<S3Object> list, CountDownLatch latch, String url) {
this.list = list;
this.latch = latch;
this.url = url;
}
public void run() {
S3Downloader downloader = new S3Downloader();
S3Object result = downloader.download(url);
list.add(result);
latch.countDown();
}
}
一个 Runner 示例说明了 "client"。 go
方法是驱动,使用add
方法(不阻塞):
public class Runner {
private ExecutorService pool = Executors.newCachedThreadPool();
private int numUrls = 20;
private CountDownLatch latch = new CountDownLatch(numUrls);
private List<S3Object> results = Collections.synchronizedList(new ArrayList<S3Object>());
public void add(String url) {
pool.submit(new MyTask(results, latch, url));
}
public void go() throws Exception {
for(int i = 0; i < numUrls; i++) {
String url = "http://example" + i;
add(url);
}
// wait for all downloads
latch.await();
for (S3Object result : results) {
System.out.println("result id: " + result.id);
}
}
}
生产代码必须处理错误并可能根据需要重新组织客户端。