CompletableFuture :异步调用 void 函数
CompletableFuture : Invoke a void function asynchronusly
我正在尝试对某些数据库异常实施具有重试策略的数据库查询。重试策略的代码不是很相关,所以我没有包含它。正如您在下面的代码中看到的那样——我编写了一个 retryCallable,它采用 populateData()
.
中的重试策略和 Callable
在getDataFromDB
中,我从数据库中获取数据并将数据放入全局哈希图中,该哈希图用作应用程序级别的缓存。
此代码按预期工作。我想从另一个 class 调用 populateData
。但是,这将是一个阻塞调用。由于这是数据库并且有重试策略,这可能会很慢。我想异步调用 populateData
。
如何使用 CompletableFuture 或 FutureTask 来实现这一点?
CompletableFuture.runAsync
需要一个可运行的。 CompletableFuture.supplyAsync
期待供应商。我以前没有实施过这些东西。因此,有关最佳做法的任何建议都会有所帮助。
Class TestCallableRetry {
public void populateData() {
final Callable<Set<String>> retryCallable = new RetryingCallable<>(retryStrategyToRetryOnDBException(), getDataFromDB());
Set<String> data = new HashSet<>();
data = retryCallable.call();
if (data != null && !data.isEmpty()) {
// store data in a global hash map
}
}
private Callable<Set<Building>> getDataFromDB() {
return new Callable<Set<String>>() {
@Override
public Set<String> call() {
// returns data from database
}
};
}
}
Class InvokeCallableAsynchronously {
public void putDataInGlobalMap {
// call populateData asynchronously
}
}
您可以在 CompletableFuture
中组合各种实用方法,真正值得探索所有这些方法。
让我们从populateData
方法开始。根据其名称,您可以推断出它应该从某处接受数据流。
它的签名可能如下所示:
void populateData ( Supplier<? extends Collection<Building> dataSupplier );
Supplier
,顾名思义,它只是为我们提供一些数据的东西。
getDataFromDB()
似乎适合担任 Supplier
角色。
private Set<Building> getDataFromDB() // supply a building's collection
我们希望populateData
执行asynchronously
和return一个结果,无论操作是否正确执行。
所以,在未来,populateData
可能 return,告诉我们事情的进展。
我们把签名改成:
CompletableFuture<Result> populateData(Supplier<? extends Collection<Building>> supplier);
现在让我们看看方法体的样子:
CompletableFuture<Result> populateData(Supplier<? extends Collection<Building>> supplier) {
return CompletableFuture // create new completable future from factory method
.supplyAsync(supplier) // execute the supplier method (getDataFromDB() in our case)
.thenApplyAsync(data -> { // here we can work on the data supplied
if (data == null || data.isEmpty()) return new Result(false);
// some heavy operations
for (Building building : data) {
// do something
}
return new Result(true); // return dummy positive result data
})
.handleAsync((result, throwable) -> {
// check if there was any exception
if (throwable != null) {
// check if exception was thrown
Log.log(throwable);
return new Result(false);
}
return result;
});
}
现在我们可以从某个地方调用 populateData
,并在异步执行完成后应用另一个回调来执行。
populateData(TestCallableRetry::getDataFromDB).thenAccept( result -> {
if ( ! result.success ) {
// things went bad... retry ??
}
});
现在取决于您想如何应用您的重试策略。如果你只关心 re-try 一次,你可以在 thenAcceptAsync
.
内第二次调用 populateData
您还应该 catch
供应商方法中的异常并将它们转换为 java.util.concurrent.CompletionException
,因为它们在 CompletableFuture
.
中得到了顺利处理
如果将 populateData
方法分成两部分,一个 Supplier
用于获取数据,另一个 Consumer
用于存储数据,则很容易将它们与 CompletableFuture
.
// Signature compatible with Supplier<Set<String>>
private Set<String> fetchDataWithRetry() {
final RetryingCallable<Set<String>> retryCallable = new RetryingCallable<>(retryStrategyToRetryOnDBException(), getDataFromDB());
try {
return retryCallable.call();
} catch (Exception e) {
log.error("Call to database failed", e);
return Collections.emptySet();
}
}
// Signature compatible with Consumer<Set<String>>
private void storeData(Set<String> data) {
if (!data.isEmpty()) {
// store data in a global hash map
}
}
然后,在populateData()
:
private ExecutorService executor = Executors.newCachedThreadPool();
public void populateData() {
CompletableFuture
.supplyAsync(this::fetchDataWithRetry, executor)
.thenAccept(this::storeData);
}
使用带有 Executor
的 supplyAsync
版本是可选的。如果您使用单参数版本,您的任务将 运行 在公共池中;短 运行ning 任务可以,但阻塞任务则不行。
它非常简单,因为 java8 只需使用
CompletableFuture.runAsync(() -> object.func());
我正在尝试对某些数据库异常实施具有重试策略的数据库查询。重试策略的代码不是很相关,所以我没有包含它。正如您在下面的代码中看到的那样——我编写了一个 retryCallable,它采用 populateData()
.
在getDataFromDB
中,我从数据库中获取数据并将数据放入全局哈希图中,该哈希图用作应用程序级别的缓存。
此代码按预期工作。我想从另一个 class 调用 populateData
。但是,这将是一个阻塞调用。由于这是数据库并且有重试策略,这可能会很慢。我想异步调用 populateData
。
如何使用 CompletableFuture 或 FutureTask 来实现这一点?
CompletableFuture.runAsync
需要一个可运行的。 CompletableFuture.supplyAsync
期待供应商。我以前没有实施过这些东西。因此,有关最佳做法的任何建议都会有所帮助。
Class TestCallableRetry {
public void populateData() {
final Callable<Set<String>> retryCallable = new RetryingCallable<>(retryStrategyToRetryOnDBException(), getDataFromDB());
Set<String> data = new HashSet<>();
data = retryCallable.call();
if (data != null && !data.isEmpty()) {
// store data in a global hash map
}
}
private Callable<Set<Building>> getDataFromDB() {
return new Callable<Set<String>>() {
@Override
public Set<String> call() {
// returns data from database
}
};
}
}
Class InvokeCallableAsynchronously {
public void putDataInGlobalMap {
// call populateData asynchronously
}
}
您可以在 CompletableFuture
中组合各种实用方法,真正值得探索所有这些方法。
让我们从populateData
方法开始。根据其名称,您可以推断出它应该从某处接受数据流。
它的签名可能如下所示:
void populateData ( Supplier<? extends Collection<Building> dataSupplier );
Supplier
,顾名思义,它只是为我们提供一些数据的东西。
getDataFromDB()
似乎适合担任 Supplier
角色。
private Set<Building> getDataFromDB() // supply a building's collection
我们希望populateData
执行asynchronously
和return一个结果,无论操作是否正确执行。
所以,在未来,populateData
可能 return,告诉我们事情的进展。
我们把签名改成:
CompletableFuture<Result> populateData(Supplier<? extends Collection<Building>> supplier);
现在让我们看看方法体的样子:
CompletableFuture<Result> populateData(Supplier<? extends Collection<Building>> supplier) {
return CompletableFuture // create new completable future from factory method
.supplyAsync(supplier) // execute the supplier method (getDataFromDB() in our case)
.thenApplyAsync(data -> { // here we can work on the data supplied
if (data == null || data.isEmpty()) return new Result(false);
// some heavy operations
for (Building building : data) {
// do something
}
return new Result(true); // return dummy positive result data
})
.handleAsync((result, throwable) -> {
// check if there was any exception
if (throwable != null) {
// check if exception was thrown
Log.log(throwable);
return new Result(false);
}
return result;
});
}
现在我们可以从某个地方调用 populateData
,并在异步执行完成后应用另一个回调来执行。
populateData(TestCallableRetry::getDataFromDB).thenAccept( result -> {
if ( ! result.success ) {
// things went bad... retry ??
}
});
现在取决于您想如何应用您的重试策略。如果你只关心 re-try 一次,你可以在 thenAcceptAsync
.
populateData
您还应该 catch
供应商方法中的异常并将它们转换为 java.util.concurrent.CompletionException
,因为它们在 CompletableFuture
.
如果将 populateData
方法分成两部分,一个 Supplier
用于获取数据,另一个 Consumer
用于存储数据,则很容易将它们与 CompletableFuture
.
// Signature compatible with Supplier<Set<String>>
private Set<String> fetchDataWithRetry() {
final RetryingCallable<Set<String>> retryCallable = new RetryingCallable<>(retryStrategyToRetryOnDBException(), getDataFromDB());
try {
return retryCallable.call();
} catch (Exception e) {
log.error("Call to database failed", e);
return Collections.emptySet();
}
}
// Signature compatible with Consumer<Set<String>>
private void storeData(Set<String> data) {
if (!data.isEmpty()) {
// store data in a global hash map
}
}
然后,在populateData()
:
private ExecutorService executor = Executors.newCachedThreadPool();
public void populateData() {
CompletableFuture
.supplyAsync(this::fetchDataWithRetry, executor)
.thenAccept(this::storeData);
}
使用带有 Executor
的 supplyAsync
版本是可选的。如果您使用单参数版本,您的任务将 运行 在公共池中;短 运行ning 任务可以,但阻塞任务则不行。
它非常简单,因为 java8 只需使用
CompletableFuture.runAsync(() -> object.func());