如何使用 GPars 在给定的超时时间内获得多个异步结果?
How to get multiple async results within a given timeout with GPars?
我想在特定的超时时间内使用并行处理检索多个 "costly" 结果。
我正在使用 GPars Dataflow.task,但看起来我缺少一些东西作为过程 returns 只有当所有数据流变量都被绑定时。
def timeout = 500
def mapResults = []
GParsPool.withPool(3) {
def taskWeb1 = Dataflow.task {
mapResults.web1 = new URL('http://web1.com').getText()
}.join(timeout, TimeUnit.MILLISECONDS)
def taskWeb2 = Dataflow.task {
mapResults.web2 = new URL('http://web2.com').getText()
}.join(timeout, TimeUnit.MILLISECONDS)
def taskWeb3 = Dataflow.task {
mapResults.web3 = new URL('http://web3.com').getText()
}.join(timeout, TimeUnit.MILLISECONDS)
}
我确实在 GPars Timeouts doc 中看到了一种使用 Select 在超时内获得最快结果的方法。
但我正在寻找一种方法来在给定的时间范围内检索尽可能多的结果。
有没有更好的"GPars"方法来实现这个?
或者用 Java 8 Future/Callable ?
由于您也对 Java 基于 8 的解决方案感兴趣,这里有一种方法可以做到:
int timeout = 250;
ExecutorService executorService = Executors.newFixedThreadPool(3);
try {
Map<String, CompletableFuture<String>> map =
Stream.of("http://google.com", "http://yahoo.com", "http://bing.com")
.collect(
Collectors.toMap(
// the key will be the URL
Function.identity(),
// the value will be the CompletableFuture text fetched from the url
(url) -> CompletableFuture.supplyAsync(
() -> readUrl(url, timeout),
executorService
)
)
);
executorService.awaitTermination(timeout, TimeUnit.MILLISECONDS);
//print the resulting map, cutting the text at 100 chars
map.entrySet().stream().forEach(entry -> {
CompletableFuture<String> future = entry.getValue();
boolean completed = future.isDone()
&& !future.isCompletedExceptionally()
&& !future.isCancelled();
System.out.printf("url %s completed: %s, error: %s, result: %.100s\n",
entry.getKey(),
completed,
future.isCompletedExceptionally(),
completed ? future.getNow(null) : null);
});
} catch (InterruptedException e) {
//rethrow
} finally {
executorService.shutdownNow();
}
这会为您提供与您拥有的 URL 一样多的 Future
,但让您有机会查看是否有任何任务因异常而失败。如果您对这些异常不感兴趣,可以简化代码,只关注成功检索的内容:
int timeout = 250;
ExecutorService executorService = Executors.newFixedThreadPool(3);
try {
Map<String, String> map = Collections.synchronizedMap(new HashMap<>());
Stream.of("http://google.com", "http://yahoo.com", "http://bing.com")
.forEach(url -> {
CompletableFuture
.supplyAsync(
() -> readUrl(url, timeout),
executorService
).thenAccept(content -> map.put(url, content));
});
executorService.awaitTermination(timeout, TimeUnit.MILLISECONDS);
//print the resulting map, cutting the text at 100 chars
map.entrySet().stream().forEach(entry -> {
System.out.printf("url %s completed, result: %.100s\n",
entry.getKey(), entry.getValue() );
});
} catch (InterruptedException e) {
//rethrow
} finally {
executorService.shutdownNow();
}
在打印结果之前,这两个代码都将等待大约 250 毫秒(由于将任务提交给执行程序服务,因此只需要多一点点时间)。我发现大约 250 毫秒是可以在我的网络上获取其中一些 url-s 的阈值,但不一定是全部。随意调整超时进行实验。
对于 readUrl(url, timeout)
方法,您可以使用像 Apache Commons IO 这样的实用程序库。即使您没有明确考虑 timeout
参数,提交给执行程序服务的任务也会收到中断信号。我可以为此提供一个实现,但我认为它超出了你问题中主要问题的范围。
我想在特定的超时时间内使用并行处理检索多个 "costly" 结果。
我正在使用 GPars Dataflow.task,但看起来我缺少一些东西作为过程 returns 只有当所有数据流变量都被绑定时。
def timeout = 500
def mapResults = []
GParsPool.withPool(3) {
def taskWeb1 = Dataflow.task {
mapResults.web1 = new URL('http://web1.com').getText()
}.join(timeout, TimeUnit.MILLISECONDS)
def taskWeb2 = Dataflow.task {
mapResults.web2 = new URL('http://web2.com').getText()
}.join(timeout, TimeUnit.MILLISECONDS)
def taskWeb3 = Dataflow.task {
mapResults.web3 = new URL('http://web3.com').getText()
}.join(timeout, TimeUnit.MILLISECONDS)
}
我确实在 GPars Timeouts doc 中看到了一种使用 Select 在超时内获得最快结果的方法。 但我正在寻找一种方法来在给定的时间范围内检索尽可能多的结果。
有没有更好的"GPars"方法来实现这个? 或者用 Java 8 Future/Callable ?
由于您也对 Java 基于 8 的解决方案感兴趣,这里有一种方法可以做到:
int timeout = 250;
ExecutorService executorService = Executors.newFixedThreadPool(3);
try {
Map<String, CompletableFuture<String>> map =
Stream.of("http://google.com", "http://yahoo.com", "http://bing.com")
.collect(
Collectors.toMap(
// the key will be the URL
Function.identity(),
// the value will be the CompletableFuture text fetched from the url
(url) -> CompletableFuture.supplyAsync(
() -> readUrl(url, timeout),
executorService
)
)
);
executorService.awaitTermination(timeout, TimeUnit.MILLISECONDS);
//print the resulting map, cutting the text at 100 chars
map.entrySet().stream().forEach(entry -> {
CompletableFuture<String> future = entry.getValue();
boolean completed = future.isDone()
&& !future.isCompletedExceptionally()
&& !future.isCancelled();
System.out.printf("url %s completed: %s, error: %s, result: %.100s\n",
entry.getKey(),
completed,
future.isCompletedExceptionally(),
completed ? future.getNow(null) : null);
});
} catch (InterruptedException e) {
//rethrow
} finally {
executorService.shutdownNow();
}
这会为您提供与您拥有的 URL 一样多的 Future
,但让您有机会查看是否有任何任务因异常而失败。如果您对这些异常不感兴趣,可以简化代码,只关注成功检索的内容:
int timeout = 250;
ExecutorService executorService = Executors.newFixedThreadPool(3);
try {
Map<String, String> map = Collections.synchronizedMap(new HashMap<>());
Stream.of("http://google.com", "http://yahoo.com", "http://bing.com")
.forEach(url -> {
CompletableFuture
.supplyAsync(
() -> readUrl(url, timeout),
executorService
).thenAccept(content -> map.put(url, content));
});
executorService.awaitTermination(timeout, TimeUnit.MILLISECONDS);
//print the resulting map, cutting the text at 100 chars
map.entrySet().stream().forEach(entry -> {
System.out.printf("url %s completed, result: %.100s\n",
entry.getKey(), entry.getValue() );
});
} catch (InterruptedException e) {
//rethrow
} finally {
executorService.shutdownNow();
}
在打印结果之前,这两个代码都将等待大约 250 毫秒(由于将任务提交给执行程序服务,因此只需要多一点点时间)。我发现大约 250 毫秒是可以在我的网络上获取其中一些 url-s 的阈值,但不一定是全部。随意调整超时进行实验。
对于 readUrl(url, timeout)
方法,您可以使用像 Apache Commons IO 这样的实用程序库。即使您没有明确考虑 timeout
参数,提交给执行程序服务的任务也会收到中断信号。我可以为此提供一个实现,但我认为它超出了你问题中主要问题的范围。