Java 11 Completable Future 重试自定义条件而不是异常

Java 11 Completable Future retry on a custom condition and not on exception

我有一个完整的未来定义如下

CompletableFuture<Person> personFutures = personService.getPersons();

现在,根据特定条件,我需要检查并调用 getPersons,直到条件匹配或重试次数(5 次重试,间隔 5 秒)完成。

条件为

if(personFutures.get().size() != totalPersonsInOrg){
 retry(personService.getPersons(), 5, 5)
} else {
 return persons
}

我想在第一个 completablefuture 之后使用 thenApply 和 thenCompose 将它们链接起来。

personFutures.thenApply(persons -> {
     if(persons.size() != totalPersonsOrg){
      retry(personservice,5,5)
     }
})

这是需要更改的内容

private boolean allPersonsFound(String id, int retry, int c 
         count) 
{ 
    if (retry > maxRetries) {
        return false;
    }

     CompletableFuture<List<Persons>> personsFuture = personaService.getPersons();
    List<Persons> persons = personsFuture.get();

    if (persons.size() != count) {
        //add delay of 50ms
        return allPersonsFound(id, retry++, count);
    }
    return true;
}

假设您的 PersonsService 是:

interface PersonsService {
   CompletableFuture<Persons> getPersons();
}

您可能想要一个带有额外验证和重试逻辑的代理实现。

一种可能是使用异步递归。像这样的东西(我还没试过 运行 呢!):

final class ValidatedPersonsService implements PersonsService {
    private final PersonsService upstreamService;
    private final Predicate<Persons> validationPredicate;
    private final int numberOfAttempts;
    private final long backoffDuratioMs;
    private final Executor executor;
    private final Executor delayedExecutor;

    ValidatedPersonsService(final PersonsService upstreamService,
                            final Predicate<Persons> validationPredicate,
                            final int numberOfAttempts,
                            final long backoffDuratioMs,
                            final Executor executor) {
        this.upstreamService = upstreamService;
        this.validationPredicate = validationPredicate;
        this.numberOfAttempts = numberOfAttempts;
        this.backoffDuratioMs = backoffDuratioMs;
        this.executor = executor;
        this.delayedExecutor = CompletableFuture.delayedExecutor(backoffDuratioMs, TimeUnit.MILLISECONDS, executor);
    }

    // this one is needed to track number of passed attempts through the async recursion steps
    private static final class PersonsResponse {
        final Persons persons;
        final int attempt;

        private PersonsResponse(final Persons persons, final int attempt) {
            this.persons = persons;
            this.attempt = attempt;
        }
    }

    @Override
    public CompletableFuture<Persons> getPersons() {
        return submitRequest(1, executor)
                .thenApply(response -> response.persons);
    }

    private CompletableFuture<PersonsResponse> submitRequest(int currentAttempt, Executor executor) {
        if (currentAttempt > numberOfAttempts) {
            return CompletableFuture.failedFuture(new RuntimeException("max number of attempts exceeded"));
        } else {
            return upstreamService
                    .getPersons()
                    .thenApplyAsync(persons -> new PersonsResponse(persons, currentAttempt), executor) // break out into our executor, to not rely on concurrency model of the upstream service
                    .thenCompose(this::validateResponse);
        }
    }

    private CompletableFuture<PersonsResponse> validateResponse(PersonsResponse response) {
        if (validationPredicate.test(response.persons)) {
            return CompletableFuture.completedFuture(response);
        } else {
            return submitRequest(response.attempt + 1, delayedExecutor);
        }
    }
}