如何使用 ExecutorService 轮询直到结果到达
How to use ExecutorService to poll until a result arrives
我有一个场景,我必须轮询远程服务器以检查任务是否已完成。完成后,我会进行不同的调用以检索结果。
我最初认为我应该使用 SingleThreadScheduledExecutor
和 scheduleWithFixedDelay
进行轮询:
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture future = executor.scheduleWithFixedDelay(() -> poll(jobId), 0, 10, TimeUnit.SECONDS);
public void poll(String jobId) {
boolean jobDone = remoteServer.isJobDone(jobId);
if (jobDone) {
retrieveJobResult(jobId);
}
}
但是因为我只能提供一个Runnable
到scheduleWithFixedDelay
而不能return任何东西,我不知道future
什么时候会完成,如果曾经。调用 future.get()
到底是什么意思?我在等什么结果?
我第一次检测到远程任务已经完成,我想执行一个不同的远程调用并将其结果设置为 future
的值。我想我可以为此使用 CompletableFuture,我将转发到我的 poll
方法,该方法又将它转发到我的 retrieveTask
方法,最终完成它:
CompletableFuture<Object> result = new CompletableFuture<Object>();
ScheduledFuture future = executor.scheduleWithFixedDelay(() -> poll(jobId, result), 0, 10, TimeUnit.SECONDS);
public void poll(String jobId, CompletableFuture<Object> result) {
boolean jobDone = remoteServer.isJobDone(jobId);
if (jobDone) {
retrieveJobResult(jobId, result);
}
}
public void retrieveJobResult(String jobId, CompletableFuture<Object> result) {
Object remoteResult = remoteServer.getJobResult(jobId);
result.complete(remoteResult);
}
但这有很多问题。其一,CompletableFuture
似乎并不适合这种用途。相反,我认为我应该做 CompletableFuture.supplyAsync(() -> poll(jobId))
,但是当我的 CompletableFuture
是 canceled/complete?感觉轮询应该以完全不同的方式实现。
我认为 CompletableFutures 是一个很好的方法:
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
private void run() {
final Object jobResult = pollForCompletion("jobId1")
.thenApply(jobId -> remoteServer.getJobResult(jobId))
.get();
}
private CompletableFuture<String> pollForCompletion(final String jobId) {
CompletableFuture<String> completionFuture = new CompletableFuture<>();
final ScheduledFuture<Void> checkFuture = executor.scheduleAtFixedRate(() -> {
if (remoteServer.isJobDone(jobId)) {
completionFuture.complete(jobId);
}
}, 0, 10, TimeUnit.SECONDS);
completionFuture.whenComplete((result, thrown) -> {
checkFuture.cancel(true);
});
return completionFuture;
}
在我看来,您比其他任何人都更担心某些文体问题。在java8中,CompletableFuture
有2个作用:一个是传统的future,给任务执行和状态查询的异步源;另一个就是我们通常所说的承诺。一个承诺,如果你还不知道,可以被认为是未来的建设者及其完成来源。所以在这种情况下,直觉上需要一个承诺,这正是您在这里使用的情况。你担心的例子是向你介绍第一次使用的东西,而不是承诺方式。
接受这一点,你应该更容易开始处理你的实际问题。我认为这个承诺应该有两个作用,一个是通知你的任务完成轮询,另一个是在完成时取消你的预定任务。这里应该是最终的解决方案:
public CompletableFuture<Object> pollTask(int jobId) {
CompletableFuture<Object> fut = new CompletableFuture<>();
ScheduledFuture<?> sfuture = executor.scheduleWithFixedDelay(() -> _poll(jobId, fut), 0, 10, TimeUnit.SECONDS);
fut.thenAccept(ignore -> sfuture.cancel(false));
return fut;
}
private void _poll(int jobId, CompletableFuture<Object> fut) {
// whatever polls
if (isDone) {
fut.complete(yourResult);
}
}
我受 使用 Supplier<Optional<T>>
的启发为此创建了一个通用实用程序,每个轮询都可以 return Optional.empty()
直到值准备就绪。我还实现了 timeout
,以便在超过最大时间时抛出 TimeoutException
。
用法:
ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
Supplier<Optional<String>> supplier = () -> remoteServer.isJobDone(jobId) ? Optional.of(jobId) : Optional.empty();
CompletableFuture<String> future = ScheduledCompletableFuture.builder(String.class)
.supplier(supplier)
.executorService(scheduledExecutor)
.timeUnit(TimeUnit.SECONDS)
.initialDelay(5)
.period(5)
.timeout(60 * 5)
.build();
ScheduledCompletableFuture.java
public class ScheduledCompletableFuture {
public static class ScheduledCompletableFutureBuilder<T> {
private Supplier<Optional<T>> supplier;
private ScheduledExecutorService executorService;
private Long initialDelay;
private Long period;
private Long timeout;
private TimeUnit timeUnit;
public ScheduledCompletableFutureBuilder() {
}
public ScheduledCompletableFutureBuilder<T> supplier(Supplier<Optional<T>> supplier) {
this.supplier = supplier;
return this;
}
public ScheduledCompletableFutureBuilder<T> executorService(ScheduledExecutorService executorService) {
this.executorService = executorService;
return this;
}
public ScheduledCompletableFutureBuilder<T> initialDelay(long initialDelay) {
this.initialDelay = initialDelay;
return this;
}
public ScheduledCompletableFutureBuilder<T> period(long period) {
this.period = period;
return this;
}
public ScheduledCompletableFutureBuilder<T> timeout(long timeout) {
this.timeout = timeout;
return this;
}
public ScheduledCompletableFutureBuilder<T> timeUnit(TimeUnit timeUnit) {
this.timeUnit = timeUnit;
return this;
}
public CompletableFuture<T> build() {
// take a copy of instance variables so that the Builder can be re-used
Supplier<Optional<T>> supplier = this.supplier;
ScheduledExecutorService executorService = this.executorService;
Long initialDelay = this.initialDelay;
Long period = this.period;
Long timeout = this.timeout;
TimeUnit timeUnit = this.timeUnit;
CompletableFuture<T> completableFuture = new CompletableFuture<>();
long endMillis = System.currentTimeMillis() + timeUnit.toMillis(timeout);
Runnable command = () -> {
Optional<T> optional = supplier.get();
if (optional.isPresent()) {
completableFuture.complete(optional.get());
} else if (System.currentTimeMillis() > endMillis) {
String msg = String.format("Supplier did not return a value within %s %s", timeout, timeUnit);
completableFuture.completeExceptionally(new TimeoutException(msg));
}
};
ScheduledFuture<?> scheduledFuture = executorService.scheduleAtFixedRate(command, initialDelay, period, timeUnit);
return completableFuture.whenComplete((result, exception) -> scheduledFuture.cancel(true));
}
}
public static <T> ScheduledCompletableFutureBuilder<T> builder(Class<T> type) {
return new ScheduledCompletableFutureBuilder<>();
}
}
我有一个场景,我必须轮询远程服务器以检查任务是否已完成。完成后,我会进行不同的调用以检索结果。
我最初认为我应该使用 SingleThreadScheduledExecutor
和 scheduleWithFixedDelay
进行轮询:
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture future = executor.scheduleWithFixedDelay(() -> poll(jobId), 0, 10, TimeUnit.SECONDS);
public void poll(String jobId) {
boolean jobDone = remoteServer.isJobDone(jobId);
if (jobDone) {
retrieveJobResult(jobId);
}
}
但是因为我只能提供一个Runnable
到scheduleWithFixedDelay
而不能return任何东西,我不知道future
什么时候会完成,如果曾经。调用 future.get()
到底是什么意思?我在等什么结果?
我第一次检测到远程任务已经完成,我想执行一个不同的远程调用并将其结果设置为 future
的值。我想我可以为此使用 CompletableFuture,我将转发到我的 poll
方法,该方法又将它转发到我的 retrieveTask
方法,最终完成它:
CompletableFuture<Object> result = new CompletableFuture<Object>();
ScheduledFuture future = executor.scheduleWithFixedDelay(() -> poll(jobId, result), 0, 10, TimeUnit.SECONDS);
public void poll(String jobId, CompletableFuture<Object> result) {
boolean jobDone = remoteServer.isJobDone(jobId);
if (jobDone) {
retrieveJobResult(jobId, result);
}
}
public void retrieveJobResult(String jobId, CompletableFuture<Object> result) {
Object remoteResult = remoteServer.getJobResult(jobId);
result.complete(remoteResult);
}
但这有很多问题。其一,CompletableFuture
似乎并不适合这种用途。相反,我认为我应该做 CompletableFuture.supplyAsync(() -> poll(jobId))
,但是当我的 CompletableFuture
是 canceled/complete?感觉轮询应该以完全不同的方式实现。
我认为 CompletableFutures 是一个很好的方法:
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
private void run() {
final Object jobResult = pollForCompletion("jobId1")
.thenApply(jobId -> remoteServer.getJobResult(jobId))
.get();
}
private CompletableFuture<String> pollForCompletion(final String jobId) {
CompletableFuture<String> completionFuture = new CompletableFuture<>();
final ScheduledFuture<Void> checkFuture = executor.scheduleAtFixedRate(() -> {
if (remoteServer.isJobDone(jobId)) {
completionFuture.complete(jobId);
}
}, 0, 10, TimeUnit.SECONDS);
completionFuture.whenComplete((result, thrown) -> {
checkFuture.cancel(true);
});
return completionFuture;
}
在我看来,您比其他任何人都更担心某些文体问题。在java8中,CompletableFuture
有2个作用:一个是传统的future,给任务执行和状态查询的异步源;另一个就是我们通常所说的承诺。一个承诺,如果你还不知道,可以被认为是未来的建设者及其完成来源。所以在这种情况下,直觉上需要一个承诺,这正是您在这里使用的情况。你担心的例子是向你介绍第一次使用的东西,而不是承诺方式。
接受这一点,你应该更容易开始处理你的实际问题。我认为这个承诺应该有两个作用,一个是通知你的任务完成轮询,另一个是在完成时取消你的预定任务。这里应该是最终的解决方案:
public CompletableFuture<Object> pollTask(int jobId) {
CompletableFuture<Object> fut = new CompletableFuture<>();
ScheduledFuture<?> sfuture = executor.scheduleWithFixedDelay(() -> _poll(jobId, fut), 0, 10, TimeUnit.SECONDS);
fut.thenAccept(ignore -> sfuture.cancel(false));
return fut;
}
private void _poll(int jobId, CompletableFuture<Object> fut) {
// whatever polls
if (isDone) {
fut.complete(yourResult);
}
}
我受 Supplier<Optional<T>>
的启发为此创建了一个通用实用程序,每个轮询都可以 return Optional.empty()
直到值准备就绪。我还实现了 timeout
,以便在超过最大时间时抛出 TimeoutException
。
用法:
ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
Supplier<Optional<String>> supplier = () -> remoteServer.isJobDone(jobId) ? Optional.of(jobId) : Optional.empty();
CompletableFuture<String> future = ScheduledCompletableFuture.builder(String.class)
.supplier(supplier)
.executorService(scheduledExecutor)
.timeUnit(TimeUnit.SECONDS)
.initialDelay(5)
.period(5)
.timeout(60 * 5)
.build();
ScheduledCompletableFuture.java
public class ScheduledCompletableFuture {
public static class ScheduledCompletableFutureBuilder<T> {
private Supplier<Optional<T>> supplier;
private ScheduledExecutorService executorService;
private Long initialDelay;
private Long period;
private Long timeout;
private TimeUnit timeUnit;
public ScheduledCompletableFutureBuilder() {
}
public ScheduledCompletableFutureBuilder<T> supplier(Supplier<Optional<T>> supplier) {
this.supplier = supplier;
return this;
}
public ScheduledCompletableFutureBuilder<T> executorService(ScheduledExecutorService executorService) {
this.executorService = executorService;
return this;
}
public ScheduledCompletableFutureBuilder<T> initialDelay(long initialDelay) {
this.initialDelay = initialDelay;
return this;
}
public ScheduledCompletableFutureBuilder<T> period(long period) {
this.period = period;
return this;
}
public ScheduledCompletableFutureBuilder<T> timeout(long timeout) {
this.timeout = timeout;
return this;
}
public ScheduledCompletableFutureBuilder<T> timeUnit(TimeUnit timeUnit) {
this.timeUnit = timeUnit;
return this;
}
public CompletableFuture<T> build() {
// take a copy of instance variables so that the Builder can be re-used
Supplier<Optional<T>> supplier = this.supplier;
ScheduledExecutorService executorService = this.executorService;
Long initialDelay = this.initialDelay;
Long period = this.period;
Long timeout = this.timeout;
TimeUnit timeUnit = this.timeUnit;
CompletableFuture<T> completableFuture = new CompletableFuture<>();
long endMillis = System.currentTimeMillis() + timeUnit.toMillis(timeout);
Runnable command = () -> {
Optional<T> optional = supplier.get();
if (optional.isPresent()) {
completableFuture.complete(optional.get());
} else if (System.currentTimeMillis() > endMillis) {
String msg = String.format("Supplier did not return a value within %s %s", timeout, timeUnit);
completableFuture.completeExceptionally(new TimeoutException(msg));
}
};
ScheduledFuture<?> scheduledFuture = executorService.scheduleAtFixedRate(command, initialDelay, period, timeUnit);
return completableFuture.whenComplete((result, exception) -> scheduledFuture.cancel(true));
}
}
public static <T> ScheduledCompletableFutureBuilder<T> builder(Class<T> type) {
return new ScheduledCompletableFutureBuilder<>();
}
}