如何在 Java 中实现非阻塞的未来处理器以便稍后检索处理的结果

How to implement a nonblocking future processor in Java in order to retrieve processed results later

我正在使用具有下面代码的外部库。我发送了很多命令,我对统计结果很感兴趣,以检查调用失败的次数和成功的次数

public Future<CommandResult> sendCommand(Command command) {
    return command.execute();
}
CommandResult can be success or failure

但是,如果我使用client.sendCommand(command).get();那么,我正在同步等待结果,同时应用程序被阻止。

我只想稍后检查(30 秒后调用成功和失败)。我保证在 10 秒内得到答复。 问题是应用程序等待计算完成,然后检索其结果。

我是根据以下答案考虑这种方法的:

List<Future< CommandResult >> futures = new ArrayList<>();
for(Command command: commands) {
   futures.add(client.sendCommand(command)); 
} 

//in a scheduler, 30+ seconds later 
for (Future<Boolean> future : futures) {  
   saveResult(future.get());
} 

Future 是遗留的 java 功能,不允许反应式非阻塞功能。 CompletableFuture 是 Java 中的后期增强功能,以允许此类反应性非阻塞功能。

您可以基于 this previous SO answer 尝试将您的 Future 转换为 CompletableFuture 然后您将公开方法以利用非阻塞执行。

检查以下示例并进行相应修改。

public class Application {

    public static void main(String[] args) throws ParseException {

        Future future =  new SquareCalculator().calculate(10);
        CompletableFuture<Integer> completableFuture = makeCompletableFuture(future);
        System.out.println("before apply");
        completableFuture.thenApply(s -> {
            System.out.println(s);
            return s;
        });
        System.out.println("after apply method");
    }


    public static <T> CompletableFuture<T> makeCompletableFuture(Future<T> future) {
        if (future.isDone())
            return transformDoneFuture(future);
        return CompletableFuture.supplyAsync(() -> {
            try {
                if (!future.isDone())
                    awaitFutureIsDoneInForkJoinPool(future);
                return future.get();
            } catch (ExecutionException e) {
                throw new RuntimeException(e);
            } catch (InterruptedException e) {
                // Normally, this should never happen inside ForkJoinPool
                Thread.currentThread().interrupt();
                // Add the following statement if the future doesn't have side effects
                // future.cancel(true);
                throw new RuntimeException(e);
            }
        });
    }

    private static <T> CompletableFuture<T> transformDoneFuture(Future<T> future) {
        CompletableFuture<T> cf = new CompletableFuture<>();
        T result;
        try {
            result = future.get();
        } catch (Throwable ex) {
            cf.completeExceptionally(ex);
            return cf;
        }
        cf.complete(result);
        return cf;
    }

    private static void awaitFutureIsDoneInForkJoinPool(Future<?> future)
            throws InterruptedException {
        ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
            @Override public boolean block() throws InterruptedException {
                try {
                    future.get();
                } catch (ExecutionException e) {
                    throw new RuntimeException(e);
                }
                return true;
            }
            @Override public boolean isReleasable() {
                return future.isDone();
            }
        });
    }
}

然后class创建一个示例Future

public class SquareCalculator {

    private ExecutorService executor
            = Executors.newSingleThreadExecutor();

    public Future<Integer> calculate(Integer input) {
        return executor.submit(() -> {
            Thread.sleep(1000);
            return input * input;
        });
    }
}

将结果变成

I would like to check only later (after 30 seconds which calls succeeded and which failed). I am guaranteed to get an answer in 10 seconds. The problem is that the app waits for the computation to complete, and then retrieves its result.

如果您想稍后检查结果,那么使用 Future<Boolean> 的解决方案应该没问题。作业将在后台 运行,当您调用 future.get() 时,您将获得结果表格。但是,每个 get() 调用都会阻塞。

如果您想在收到结果时获得结果,我会使用 ExecutorCompletionService,您可以随时对其进行轮询以查看是否有结果。投票结果为 non-blocking.

// create your thread pool using fixed or other pool
Executor<Result> threadPool = Executors.newFixedThreadPool(5);
// wrap the Executor in a CompletionService
CompletionService<Boolean> completionService =
   new ExecutorCompletionService<>(e);
// submit jobs to the pool through the ExecutorCompletionService
for (Job job : jobs) {
   completionService.submit(job);
}
// after we have submitted all of the jobs we can shutdown the Executor
// the jobs submitted will continue to run
threadPool.shutdown();
...
// some point later you can do
int jobsRunning = jobs.size();
for (int jobsRunning = jobs.size(); jobsRunning > 0; ) {
   // do some processing ...
   // are any results available?
   Boolean result = completionService.poll();
   if (result != null) {
      // process a result if available
      jobsRunning--;
   }
}

请注意,您需要跟踪提交给 CompletionService 的职位数量。

如果将 Future 实例转换为 CompletableFuture(参见 的回答)是一个选项,那么您可以实现一个简单的辅助函数来转换 Stream<CompletableFuture<T>>进入 CompletableFuture<Stream<T>>:

public static <T> CompletableFuture<Stream<T>> collect(Stream<CompletableFuture<T>> futures) {
    return futures
        .map(future -> future.thenApply(Stream::of))
        .reduce(
            CompletableFuture.completedFuture(Stream.empty()),
            (future1, future2) ->
            future1
                .thenCompose(stream1 ->
            future2
                .thenApply(stream2 ->
            concat(stream1, stream2)))
        );
}

从本质上讲,这减少了与流的未来并行的未来流。

如果你使用这个,例如在字符串的期货流上,它将 return 一个在最后一个单个期货完成后完成的未来:

Stream<CompletableFuture<String>> streamOfFutures = ...
CompletableFuture<Stream<String>> futureOfStream = collect(streamOfFutures);

// Prints a list of strings once the "slowest" future completed
System.out.println(futureOfStream.get().toList());