如何在 Java 中实现非阻塞的未来处理器以便稍后检索处理的结果
How to implement a nonblocking future processor in Java in order to retrieve processed results later
我正在使用具有下面代码的外部库。我发送了很多命令,我对统计结果很感兴趣,以检查调用失败的次数和成功的次数
public Future<CommandResult> sendCommand(Command command) {
return command.execute();
}
CommandResult can be success or failure
但是,如果我使用client.sendCommand(command).get();
那么,我正在同步等待结果,同时应用程序被阻止。
我只想稍后检查(30 秒后调用成功和失败)。我保证在 10 秒内得到答复。
问题是应用程序等待计算完成,然后检索其结果。
我是根据以下答案考虑这种方法的:
List<Future< CommandResult >> futures = new ArrayList<>();
for(Command command: commands) {
futures.add(client.sendCommand(command));
}
//in a scheduler, 30+ seconds later
for (Future<Boolean> future : futures) {
saveResult(future.get());
}
Future
是遗留的 java 功能,不允许反应式非阻塞功能。 CompletableFuture
是 Java 中的后期增强功能,以允许此类反应性非阻塞功能。
您可以基于 this previous SO answer 尝试将您的 Future
转换为 CompletableFuture
然后您将公开方法以利用非阻塞执行。
检查以下示例并进行相应修改。
public class Application {
public static void main(String[] args) throws ParseException {
Future future = new SquareCalculator().calculate(10);
CompletableFuture<Integer> completableFuture = makeCompletableFuture(future);
System.out.println("before apply");
completableFuture.thenApply(s -> {
System.out.println(s);
return s;
});
System.out.println("after apply method");
}
public static <T> CompletableFuture<T> makeCompletableFuture(Future<T> future) {
if (future.isDone())
return transformDoneFuture(future);
return CompletableFuture.supplyAsync(() -> {
try {
if (!future.isDone())
awaitFutureIsDoneInForkJoinPool(future);
return future.get();
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
// Normally, this should never happen inside ForkJoinPool
Thread.currentThread().interrupt();
// Add the following statement if the future doesn't have side effects
// future.cancel(true);
throw new RuntimeException(e);
}
});
}
private static <T> CompletableFuture<T> transformDoneFuture(Future<T> future) {
CompletableFuture<T> cf = new CompletableFuture<>();
T result;
try {
result = future.get();
} catch (Throwable ex) {
cf.completeExceptionally(ex);
return cf;
}
cf.complete(result);
return cf;
}
private static void awaitFutureIsDoneInForkJoinPool(Future<?> future)
throws InterruptedException {
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
@Override public boolean block() throws InterruptedException {
try {
future.get();
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
return true;
}
@Override public boolean isReleasable() {
return future.isDone();
}
});
}
}
然后class创建一个示例Future
public class SquareCalculator {
private ExecutorService executor
= Executors.newSingleThreadExecutor();
public Future<Integer> calculate(Integer input) {
return executor.submit(() -> {
Thread.sleep(1000);
return input * input;
});
}
}
将结果变成
I would like to check only later (after 30 seconds which calls succeeded and which failed). I am guaranteed to get an answer in 10 seconds. The problem is that the app waits for the computation to complete, and then retrieves its result.
如果您想稍后检查结果,那么使用 Future<Boolean>
的解决方案应该没问题。作业将在后台 运行,当您调用 future.get()
时,您将获得结果表格。但是,每个 get()
调用都会阻塞。
如果您想在收到结果时获得结果,我会使用 ExecutorCompletionService
,您可以随时对其进行轮询以查看是否有结果。投票结果为 non-blocking.
// create your thread pool using fixed or other pool
Executor<Result> threadPool = Executors.newFixedThreadPool(5);
// wrap the Executor in a CompletionService
CompletionService<Boolean> completionService =
new ExecutorCompletionService<>(e);
// submit jobs to the pool through the ExecutorCompletionService
for (Job job : jobs) {
completionService.submit(job);
}
// after we have submitted all of the jobs we can shutdown the Executor
// the jobs submitted will continue to run
threadPool.shutdown();
...
// some point later you can do
int jobsRunning = jobs.size();
for (int jobsRunning = jobs.size(); jobsRunning > 0; ) {
// do some processing ...
// are any results available?
Boolean result = completionService.poll();
if (result != null) {
// process a result if available
jobsRunning--;
}
}
请注意,您需要跟踪提交给 CompletionService
的职位数量。
如果将 Future
实例转换为 CompletableFuture
(参见 的回答)是一个选项,那么您可以实现一个简单的辅助函数来转换 Stream<CompletableFuture<T>>
进入 CompletableFuture<Stream<T>>
:
public static <T> CompletableFuture<Stream<T>> collect(Stream<CompletableFuture<T>> futures) {
return futures
.map(future -> future.thenApply(Stream::of))
.reduce(
CompletableFuture.completedFuture(Stream.empty()),
(future1, future2) ->
future1
.thenCompose(stream1 ->
future2
.thenApply(stream2 ->
concat(stream1, stream2)))
);
}
从本质上讲,这减少了与流的未来并行的未来流。
如果你使用这个,例如在字符串的期货流上,它将 return 一个在最后一个单个期货完成后完成的未来:
Stream<CompletableFuture<String>> streamOfFutures = ...
CompletableFuture<Stream<String>> futureOfStream = collect(streamOfFutures);
// Prints a list of strings once the "slowest" future completed
System.out.println(futureOfStream.get().toList());
我正在使用具有下面代码的外部库。我发送了很多命令,我对统计结果很感兴趣,以检查调用失败的次数和成功的次数
public Future<CommandResult> sendCommand(Command command) {
return command.execute();
}
CommandResult can be success or failure
但是,如果我使用client.sendCommand(command).get();
那么,我正在同步等待结果,同时应用程序被阻止。
我只想稍后检查(30 秒后调用成功和失败)。我保证在 10 秒内得到答复。 问题是应用程序等待计算完成,然后检索其结果。
我是根据以下答案考虑这种方法的:
List<Future< CommandResult >> futures = new ArrayList<>();
for(Command command: commands) {
futures.add(client.sendCommand(command));
}
//in a scheduler, 30+ seconds later
for (Future<Boolean> future : futures) {
saveResult(future.get());
}
Future
是遗留的 java 功能,不允许反应式非阻塞功能。 CompletableFuture
是 Java 中的后期增强功能,以允许此类反应性非阻塞功能。
您可以基于 this previous SO answer 尝试将您的 Future
转换为 CompletableFuture
然后您将公开方法以利用非阻塞执行。
检查以下示例并进行相应修改。
public class Application {
public static void main(String[] args) throws ParseException {
Future future = new SquareCalculator().calculate(10);
CompletableFuture<Integer> completableFuture = makeCompletableFuture(future);
System.out.println("before apply");
completableFuture.thenApply(s -> {
System.out.println(s);
return s;
});
System.out.println("after apply method");
}
public static <T> CompletableFuture<T> makeCompletableFuture(Future<T> future) {
if (future.isDone())
return transformDoneFuture(future);
return CompletableFuture.supplyAsync(() -> {
try {
if (!future.isDone())
awaitFutureIsDoneInForkJoinPool(future);
return future.get();
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
// Normally, this should never happen inside ForkJoinPool
Thread.currentThread().interrupt();
// Add the following statement if the future doesn't have side effects
// future.cancel(true);
throw new RuntimeException(e);
}
});
}
private static <T> CompletableFuture<T> transformDoneFuture(Future<T> future) {
CompletableFuture<T> cf = new CompletableFuture<>();
T result;
try {
result = future.get();
} catch (Throwable ex) {
cf.completeExceptionally(ex);
return cf;
}
cf.complete(result);
return cf;
}
private static void awaitFutureIsDoneInForkJoinPool(Future<?> future)
throws InterruptedException {
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
@Override public boolean block() throws InterruptedException {
try {
future.get();
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
return true;
}
@Override public boolean isReleasable() {
return future.isDone();
}
});
}
}
然后class创建一个示例Future
public class SquareCalculator {
private ExecutorService executor
= Executors.newSingleThreadExecutor();
public Future<Integer> calculate(Integer input) {
return executor.submit(() -> {
Thread.sleep(1000);
return input * input;
});
}
}
将结果变成
I would like to check only later (after 30 seconds which calls succeeded and which failed). I am guaranteed to get an answer in 10 seconds. The problem is that the app waits for the computation to complete, and then retrieves its result.
如果您想稍后检查结果,那么使用 Future<Boolean>
的解决方案应该没问题。作业将在后台 运行,当您调用 future.get()
时,您将获得结果表格。但是,每个 get()
调用都会阻塞。
如果您想在收到结果时获得结果,我会使用 ExecutorCompletionService
,您可以随时对其进行轮询以查看是否有结果。投票结果为 non-blocking.
// create your thread pool using fixed or other pool
Executor<Result> threadPool = Executors.newFixedThreadPool(5);
// wrap the Executor in a CompletionService
CompletionService<Boolean> completionService =
new ExecutorCompletionService<>(e);
// submit jobs to the pool through the ExecutorCompletionService
for (Job job : jobs) {
completionService.submit(job);
}
// after we have submitted all of the jobs we can shutdown the Executor
// the jobs submitted will continue to run
threadPool.shutdown();
...
// some point later you can do
int jobsRunning = jobs.size();
for (int jobsRunning = jobs.size(); jobsRunning > 0; ) {
// do some processing ...
// are any results available?
Boolean result = completionService.poll();
if (result != null) {
// process a result if available
jobsRunning--;
}
}
请注意,您需要跟踪提交给 CompletionService
的职位数量。
如果将 Future
实例转换为 CompletableFuture
(参见 Stream<CompletableFuture<T>>
进入 CompletableFuture<Stream<T>>
:
public static <T> CompletableFuture<Stream<T>> collect(Stream<CompletableFuture<T>> futures) {
return futures
.map(future -> future.thenApply(Stream::of))
.reduce(
CompletableFuture.completedFuture(Stream.empty()),
(future1, future2) ->
future1
.thenCompose(stream1 ->
future2
.thenApply(stream2 ->
concat(stream1, stream2)))
);
}
从本质上讲,这减少了与流的未来并行的未来流。
如果你使用这个,例如在字符串的期货流上,它将 return 一个在最后一个单个期货完成后完成的未来:
Stream<CompletableFuture<String>> streamOfFutures = ...
CompletableFuture<Stream<String>> futureOfStream = collect(streamOfFutures);
// Prints a list of strings once the "slowest" future completed
System.out.println(futureOfStream.get().toList());