使用 ExecutorService 监视 Future<T> 个对象以完成

Watching Future<T> objects for completion using ExecutorService

我有一个程序,其中我使用提交给 ExecutorService 的 Callable 对象从 S3 下载文件。文件很大,需要几分钟才能完全下载。创建另一个 Callable class 从下载器中获取未来并观察它完成是否有意义?我的最终目标是将所有完整的下载添加到缓存中位于中心的列表。

例如:

public void add(final String s3urlToDownload){

    Future<S3Object> futureS3obj = cachedPoolExecutor.submit(new S3Downloader(s3urlToDownload));

    // Instead of calling futureS3obj.get() and waiting, submit the Future to the "Watcher" service.
    // Within FutureWatcher, the S3Object will be added to the List once the download is complete.
    cachedPoolExecutor.submit(new FutureWatcher(downloadedList, futureS3obj))

}

与其制作一个消耗资源的'watcher',不如让当前下载完成时通知主人。

这里有一些假对象,用于说明。 "Downloads" 只是随机睡眠:

// for illustration only
class S3Object {
    String id;
}

// for illustration only
class S3Downloader {

    public S3Object download(String url) {
        int min = 2;
        int max = 5;
        Random rand = new Random();
        int random = rand.nextInt((max - min) + 1) + min;

        try { Thread.sleep(1000 * random); } catch (Exception ex) {}
        S3Object result = new S3Object();
        result.id = url;
        return result;
    }
}

我们可以定义一个任务来下载文件、更新(线程安全的)列表并递减 CountDownLatch:

class MyTask implements Runnable {
    private final List<S3Object> list;
    private final CountDownLatch latch;
    private final String url; 

    public MyTask(List<S3Object> list, CountDownLatch latch, String url) {
        this.list = list;
        this.latch = latch;
        this.url = url;
    }     

    public void run() {
        S3Downloader downloader = new S3Downloader();
        S3Object result = downloader.download(url);
        list.add(result);
        latch.countDown();
    }
}

一个 Runner 示例说明了 "client"。 go方法是驱动,使用add方法(不阻塞):

public class Runner {
    private ExecutorService pool = Executors.newCachedThreadPool();
    private int numUrls = 20;
    private CountDownLatch latch = new CountDownLatch(numUrls);
    private List<S3Object> results = Collections.synchronizedList(new ArrayList<S3Object>());

    public void add(String url) {
        pool.submit(new MyTask(results, latch, url));
    }

    public void go() throws Exception {

        for(int i = 0; i < numUrls; i++) {
            String url = "http://example" + i;
            add(url);
        }

        // wait for all downloads
        latch.await();

        for (S3Object result : results) {
            System.out.println("result id: " + result.id);
        }
    }
}

生产代码必须处理错误并可能根据需要重新组织客户端。