如何使用 ExecutorService 轮询直到结果到达

How to use ExecutorService to poll until a result arrives

我有一个场景,我必须轮询远程服务器以检查任务是否已完成。完成后,我会进行不同的调用以检索结果。

我最初认为我应该使用 SingleThreadScheduledExecutorscheduleWithFixedDelay 进行轮询:

ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture future = executor.scheduleWithFixedDelay(() -> poll(jobId), 0, 10, TimeUnit.SECONDS);

public void poll(String jobId) {
   boolean jobDone = remoteServer.isJobDone(jobId);
   if (jobDone) {
       retrieveJobResult(jobId);
   }
}

但是因为我只能提供一个RunnablescheduleWithFixedDelay而不能return任何东西,我不知道future什么时候会完成,如果曾经。调用 future.get() 到底是什么意思?我在等什么结果?

我第一次检测到远程任务已经完成,我想执行一个不同的远程调用并将其结果设置为 future 的值。我想我可以为此使用 CompletableFuture,我将转发到我的 poll 方法,该方法又将它转发到我的 retrieveTask 方法,最终完成它:

CompletableFuture<Object> result = new CompletableFuture<Object>();
ScheduledFuture future = executor.scheduleWithFixedDelay(() -> poll(jobId, result), 0, 10, TimeUnit.SECONDS);

public void poll(String jobId, CompletableFuture<Object> result) {
   boolean jobDone = remoteServer.isJobDone(jobId);
   if (jobDone) {
       retrieveJobResult(jobId, result);
   }
}

public void retrieveJobResult(String jobId, CompletableFuture<Object> result) {
    Object remoteResult = remoteServer.getJobResult(jobId);
    result.complete(remoteResult);
}

但这有很多问题。其一,CompletableFuture 似乎并不适合这种用途。相反,我认为我应该做 CompletableFuture.supplyAsync(() -> poll(jobId)),但是当我的 CompletableFuture 是 canceled/complete?感觉轮询应该以完全不同的方式实现。

我认为 CompletableFutures 是一个很好的方法:

ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

private void run() {
    final Object jobResult = pollForCompletion("jobId1")
            .thenApply(jobId -> remoteServer.getJobResult(jobId))
            .get();

}

private CompletableFuture<String> pollForCompletion(final String jobId) {
    CompletableFuture<String> completionFuture = new CompletableFuture<>();
    final ScheduledFuture<Void> checkFuture = executor.scheduleAtFixedRate(() -> {
        if (remoteServer.isJobDone(jobId)) {
            completionFuture.complete(jobId);
        }
    }, 0, 10, TimeUnit.SECONDS);
    completionFuture.whenComplete((result, thrown) -> {
        checkFuture.cancel(true);
    });
    return completionFuture;
}

在我看来,您比其他任何人都更担心某些文体问题。在java8中,CompletableFuture有2个作用:一个是传统的future,给任务执行和状态查询的异步源;另一个就是我们通常所说的承诺。一个承诺,如果你还不知道,可以被认为是未来的建设者及其完成来源。所以在这种情况下,直觉上需要一个承诺,这正是您在这里使用的情况。你担心的例子是向你介绍第一次使用的东西,而不是承诺方式。

接受这一点,你应该更容易开始处理你的实际问题。我认为这个承诺应该有两个作用,一个是通知你的任务完成轮询,另一个是在完成时取消你的预定任务。这里应该是最终的解决方案:

public CompletableFuture<Object> pollTask(int jobId) {
    CompletableFuture<Object> fut = new CompletableFuture<>();
    ScheduledFuture<?> sfuture = executor.scheduleWithFixedDelay(() -> _poll(jobId, fut), 0, 10, TimeUnit.SECONDS);
    fut.thenAccept(ignore -> sfuture.cancel(false));
    return fut;
}

private void _poll(int jobId, CompletableFuture<Object> fut) {
    // whatever polls
    if (isDone) {
        fut.complete(yourResult);
    }
}

我受 使用 Supplier<Optional<T>> 的启发为此创建了一个通用实用程序,每个轮询都可以 return Optional.empty() 直到值准备就绪。我还实现了 timeout,以便在超过最大时间时抛出 TimeoutException

用法:

ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
Supplier<Optional<String>> supplier = () -> remoteServer.isJobDone(jobId) ? Optional.of(jobId) : Optional.empty();
CompletableFuture<String> future = ScheduledCompletableFuture.builder(String.class)
   .supplier(supplier)
   .executorService(scheduledExecutor)
   .timeUnit(TimeUnit.SECONDS)
   .initialDelay(5)
   .period(5)
   .timeout(60 * 5)
   .build();

ScheduledCompletableFuture.java

public class ScheduledCompletableFuture {
    public static class ScheduledCompletableFutureBuilder<T> {
        private Supplier<Optional<T>> supplier;
        private ScheduledExecutorService executorService;
        private Long initialDelay;
        private Long period;
        private Long timeout;
        private TimeUnit timeUnit;

        public ScheduledCompletableFutureBuilder() {
        }

        public ScheduledCompletableFutureBuilder<T> supplier(Supplier<Optional<T>> supplier) {
            this.supplier = supplier;
            return this;
        }

        public ScheduledCompletableFutureBuilder<T> executorService(ScheduledExecutorService executorService) {
            this.executorService = executorService;
            return this;
        }

        public ScheduledCompletableFutureBuilder<T> initialDelay(long initialDelay) {
            this.initialDelay = initialDelay;
            return this;
        }

        public ScheduledCompletableFutureBuilder<T> period(long period) {
            this.period = period;
            return this;
        }

        public ScheduledCompletableFutureBuilder<T> timeout(long timeout) {
            this.timeout = timeout;
            return this;
        }

        public ScheduledCompletableFutureBuilder<T> timeUnit(TimeUnit timeUnit) {
            this.timeUnit = timeUnit;
            return this;
        }

        public CompletableFuture<T> build() {
            // take a copy of instance variables so that the Builder can be re-used
            Supplier<Optional<T>> supplier = this.supplier;
            ScheduledExecutorService executorService = this.executorService;
            Long initialDelay = this.initialDelay;
            Long period = this.period;
            Long timeout = this.timeout;
            TimeUnit timeUnit = this.timeUnit;

            CompletableFuture<T> completableFuture = new CompletableFuture<>();
            long endMillis = System.currentTimeMillis() + timeUnit.toMillis(timeout);
            Runnable command = () -> {
                Optional<T> optional = supplier.get();
                if (optional.isPresent()) {
                    completableFuture.complete(optional.get());
                } else if (System.currentTimeMillis() > endMillis) {
                    String msg = String.format("Supplier did not return a value within %s %s", timeout, timeUnit);
                    completableFuture.completeExceptionally(new TimeoutException(msg));
                }
            };
            ScheduledFuture<?> scheduledFuture = executorService.scheduleAtFixedRate(command, initialDelay, period, timeUnit);
            return completableFuture.whenComplete((result, exception) -> scheduledFuture.cancel(true));
        }
    }

    public static <T> ScheduledCompletableFutureBuilder<T> builder(Class<T> type) {
        return new ScheduledCompletableFutureBuilder<>();
    }
}