Java 11 Completable Future 重试自定义条件而不是异常
Java 11 Completable Future retry on a custom condition and not on exception
我有一个完整的未来定义如下
CompletableFuture<Person> personFutures = personService.getPersons();
现在,根据特定条件,我需要检查并调用 getPersons,直到条件匹配或重试次数(5 次重试,间隔 5 秒)完成。
条件为
if(personFutures.get().size() != totalPersonsInOrg){
retry(personService.getPersons(), 5, 5)
} else {
return persons
}
我想在第一个 completablefuture 之后使用 thenApply 和 thenCompose 将它们链接起来。
personFutures.thenApply(persons -> {
if(persons.size() != totalPersonsOrg){
retry(personservice,5,5)
}
})
这是需要更改的内容
private boolean allPersonsFound(String id, int retry, int c
count)
{
if (retry > maxRetries) {
return false;
}
CompletableFuture<List<Persons>> personsFuture = personaService.getPersons();
List<Persons> persons = personsFuture.get();
if (persons.size() != count) {
//add delay of 50ms
return allPersonsFound(id, retry++, count);
}
return true;
}
假设您的 PersonsService
是:
interface PersonsService {
CompletableFuture<Persons> getPersons();
}
您可能想要一个带有额外验证和重试逻辑的代理实现。
一种可能是使用异步递归。像这样的东西(我还没试过 运行 呢!):
final class ValidatedPersonsService implements PersonsService {
private final PersonsService upstreamService;
private final Predicate<Persons> validationPredicate;
private final int numberOfAttempts;
private final long backoffDuratioMs;
private final Executor executor;
private final Executor delayedExecutor;
ValidatedPersonsService(final PersonsService upstreamService,
final Predicate<Persons> validationPredicate,
final int numberOfAttempts,
final long backoffDuratioMs,
final Executor executor) {
this.upstreamService = upstreamService;
this.validationPredicate = validationPredicate;
this.numberOfAttempts = numberOfAttempts;
this.backoffDuratioMs = backoffDuratioMs;
this.executor = executor;
this.delayedExecutor = CompletableFuture.delayedExecutor(backoffDuratioMs, TimeUnit.MILLISECONDS, executor);
}
// this one is needed to track number of passed attempts through the async recursion steps
private static final class PersonsResponse {
final Persons persons;
final int attempt;
private PersonsResponse(final Persons persons, final int attempt) {
this.persons = persons;
this.attempt = attempt;
}
}
@Override
public CompletableFuture<Persons> getPersons() {
return submitRequest(1, executor)
.thenApply(response -> response.persons);
}
private CompletableFuture<PersonsResponse> submitRequest(int currentAttempt, Executor executor) {
if (currentAttempt > numberOfAttempts) {
return CompletableFuture.failedFuture(new RuntimeException("max number of attempts exceeded"));
} else {
return upstreamService
.getPersons()
.thenApplyAsync(persons -> new PersonsResponse(persons, currentAttempt), executor) // break out into our executor, to not rely on concurrency model of the upstream service
.thenCompose(this::validateResponse);
}
}
private CompletableFuture<PersonsResponse> validateResponse(PersonsResponse response) {
if (validationPredicate.test(response.persons)) {
return CompletableFuture.completedFuture(response);
} else {
return submitRequest(response.attempt + 1, delayedExecutor);
}
}
}
我有一个完整的未来定义如下
CompletableFuture<Person> personFutures = personService.getPersons();
现在,根据特定条件,我需要检查并调用 getPersons,直到条件匹配或重试次数(5 次重试,间隔 5 秒)完成。
条件为
if(personFutures.get().size() != totalPersonsInOrg){
retry(personService.getPersons(), 5, 5)
} else {
return persons
}
我想在第一个 completablefuture 之后使用 thenApply 和 thenCompose 将它们链接起来。
personFutures.thenApply(persons -> {
if(persons.size() != totalPersonsOrg){
retry(personservice,5,5)
}
})
这是需要更改的内容
private boolean allPersonsFound(String id, int retry, int c
count)
{
if (retry > maxRetries) {
return false;
}
CompletableFuture<List<Persons>> personsFuture = personaService.getPersons();
List<Persons> persons = personsFuture.get();
if (persons.size() != count) {
//add delay of 50ms
return allPersonsFound(id, retry++, count);
}
return true;
}
假设您的 PersonsService
是:
interface PersonsService {
CompletableFuture<Persons> getPersons();
}
您可能想要一个带有额外验证和重试逻辑的代理实现。
一种可能是使用异步递归。像这样的东西(我还没试过 运行 呢!):
final class ValidatedPersonsService implements PersonsService {
private final PersonsService upstreamService;
private final Predicate<Persons> validationPredicate;
private final int numberOfAttempts;
private final long backoffDuratioMs;
private final Executor executor;
private final Executor delayedExecutor;
ValidatedPersonsService(final PersonsService upstreamService,
final Predicate<Persons> validationPredicate,
final int numberOfAttempts,
final long backoffDuratioMs,
final Executor executor) {
this.upstreamService = upstreamService;
this.validationPredicate = validationPredicate;
this.numberOfAttempts = numberOfAttempts;
this.backoffDuratioMs = backoffDuratioMs;
this.executor = executor;
this.delayedExecutor = CompletableFuture.delayedExecutor(backoffDuratioMs, TimeUnit.MILLISECONDS, executor);
}
// this one is needed to track number of passed attempts through the async recursion steps
private static final class PersonsResponse {
final Persons persons;
final int attempt;
private PersonsResponse(final Persons persons, final int attempt) {
this.persons = persons;
this.attempt = attempt;
}
}
@Override
public CompletableFuture<Persons> getPersons() {
return submitRequest(1, executor)
.thenApply(response -> response.persons);
}
private CompletableFuture<PersonsResponse> submitRequest(int currentAttempt, Executor executor) {
if (currentAttempt > numberOfAttempts) {
return CompletableFuture.failedFuture(new RuntimeException("max number of attempts exceeded"));
} else {
return upstreamService
.getPersons()
.thenApplyAsync(persons -> new PersonsResponse(persons, currentAttempt), executor) // break out into our executor, to not rely on concurrency model of the upstream service
.thenCompose(this::validateResponse);
}
}
private CompletableFuture<PersonsResponse> validateResponse(PersonsResponse response) {
if (validationPredicate.test(response.persons)) {
return CompletableFuture.completedFuture(response);
} else {
return submitRequest(response.attempt + 1, delayedExecutor);
}
}
}