如何删除使用可完成未来的递归

How to remove this recursion that uses completable future

我最初编写这段代码是因为这就是我看待世界的方式,但我想研究如何删除递归。

我的想法是我有一个要呈现的指令列表。渲染器仅通过一条指令就为我提供了一个完整的未来结果。结果有时可能无效,所以我想在下一条指令上调用渲染器。我想要 return 呈现有效结果的第一条指令。

想请三位大佬看看,有空可以给点意见

/**
 * I want a a completable future for a list of instructions, returning
 * me the first valid result for the instruction input.
 */ 
private CompletableFuture<Result> getFutureResult(final List<Instruction> instructions) {
    final CompletableFuture<Result> futureResult = new CompletableFuture<>();
    renderAux(futureResult, instructions.iterator());
    return futureResult;
}

/**
 * This is actually an external method that will return a completable
 * future for only one instruction. I cannot modify this API.
 */
private CompletableFuture<Result> render(final Instruction instruction) {
    /.../
}

/**
 * This method is called recursively for the current instruction in the
 * the iterator, if the result if not valid, we call on the next instruction.
 */
private void renderAux(
        final CompletableFuture<Result> futureResult,
        final Iterator<Instruction> instructionIterator) {

    final Instruction instruction = instructionIterator.next();
    final boolean isLast = !instructionIterator.hasNext();
    render(instruction).whenComplete((result, e) -> {
        if (result.isValid() || isLast) {
            // Complete the future on the first valid result, or
            // if this is the last instruction.
            futureResult.complete(result);
        } else {
            // Recursive call for the next instruction.
            renderAux(futureResult, instructionIterator);
        }
    });
}

您可以直接使用 CompletableFuture.handle() 方法将期货链接在一起:

private CompletableFuture<Result> getFutureResult(final List<Instruction> instructions) {
    CompletableFuture<Result> futureResult = null;

    for (Instruction instruction : instructions) {
        if (futureResult == null) {
            futureResult = render(instruction);
        } else {
            futureResult = futureResult.handle((result, e) -> {
                if (result.isValid())
                    return result;
                else
                    return render(instruction).join();
            });
        }
    }

    return futureResult;
}

这会为列表中的每条指令创建一个额外的未来,但仍然不会执行不必​​要的指令。

你正在做的事情就像 CompletableFuture.anyOf but completing the future with your own condition so I called my method as anyOf.I have implemented two versions of anyOf, one matching the result in sequence, and another matching the result in parallel.After implementing my own anyOf method in deep I found the interesting thing is CompletableFuture like as ES2015 Promise/jQuery Deferred

用法

private CompletableFuture<Result> getFutureResult(List<Instruction> instructions) {
    return anyOf(instructions,this::render,Result::isValid);
}

如果你想按顺序从指令中获取有效结果,那么你可以使用如下同步方法:

the sequence algorithm implemented by CompletableFuture.thenCombine method.

private <T, R> CompletableFuture<R> anyOf(List<T> items
        , Function<T, CompletableFuture<R>> promiseMapping
        , Predicate<R> condition) {

    CompletableFuture<R> result = CompletableFuture.completedFuture(null);
    for (T it : items) {
        result = result.thenCombine(promiseMapping.apply(it), (own, other) -> {
            if (own != null && condition.test(own)) {
                return own;
            }
            return condition.test(other) ? other : null;
        });
    }
    return result;
}

如果你想从指令中并行获取一个有效结果,而不管返回的是哪个有效结果,那么你可以使用下面的非同步方法,更多细节参见测试 部分:

the parallel algorithm implemented by CompletableFuture.thenCompose method.

private <T, R> CompletableFuture<R> anyOf(List<T> items
        , Function<T, CompletableFuture<R>> promiseMapping
        , Predicate<R> condition) {
    return items.parallelStream()
            .map(promiseMapping)
            .map(completeWhenMatching(condition))
            .reduce(doneWhenAnyCompleted())
            .orElse(completedFuture(null));
}

测试

import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.*;
import java.util.stream.Stream;
import static java.util.Arrays.stream;
import static java.util.concurrent.CompletableFuture.*;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class AnyOfCompletableFutureTest {
    private static final RuntimeException CHECKING_ERROR = new RuntimeException("It can't be checked!");
    private static final Result VALID = Result.valid(true);
    private static final Result INVALID = Result.valid(false);
    private static final Result FAILED = Result.failed(CHECKING_ERROR);

    abstract static class AnyOfCompletableFutureContractTest {
        @Test
        void timeout() throws Throwable {
            blocking(blocker -> {
                Result blocked = VALID.blocked(blocker);
                CompletableFuture<Result> it = anyOf(blocked);

                assertThrows(TimeoutException.class, () -> fetch(it));

                blocker.countDown();
                assertThat(fetch(it), is(sameInstance(blocked)));
            });
        }

        @Test
        void returnsResultWhichIsValid() throws Throwable {
            assertThat(fetch(anyOf(VALID)), is(VALID));
            assertThat(fetch(anyOf(INVALID, VALID)), is(VALID));
        }

        @Test
        void returnsNullWithEmptyResults() throws Throwable {
            assertThat(fetch(anyOf(new Result[0])), is(nullValue()));
        }

        @Test
        void reportingErrorsWhenAnyOfFuturesFailed() throws Throwable {
            Throwable error = assertThrows(ExecutionException.class, () -> fetch(anyOf(FAILED)));
            assertThat(error.getCause(), is(sameInstance(CHECKING_ERROR)));
        }

        @Test
        void noValidResult() throws Throwable {
            assertNoValidResult(trying(anyOf(INVALID)));
        }

        @Test
        void containingOthersBlockedOnChecking() throws Throwable {
            blocking((blocker) -> {/**/
                assertBlockedBeforeValidResult(trying(anyOf(FAILED.blocked(blocker), VALID)));
            });
        }

        @Test
        void findingStrategy() throws Throwable {
            any[] times = new any[20];
            Result[] results = random(times.length * 3);
            Set<Result> founds = new HashSet<>();

            for (any of : times) founds.add(fetch(anyOf(results)));

            assertFoundResultWithFixedResults(founds);
        }

        protected abstract void assertFoundResultWithFixedResults(Set<Result> results);

        protected abstract void assertNoValidResult(Value<Result> result) throws Exception;

        protected abstract void assertBlockedBeforeValidResult(Value<Result> result) throws Exception;

        public abstract CompletableFuture<Result> anyOf(Result... results);

        private Result[] random(int maxSize) {
            Random random = new Random();
            Result[] results = Stream.generate(() -> Result.valid(random.nextBoolean())).limit(maxSize).toArray(Result[]::new);
            results[results.length - 1] = VALID;
            return results;
        }

        class any {

        }

        private void blocking(Action<CountDownLatch> action) throws Exception {
            CountDownLatch blocking = new CountDownLatch(1);
            try {
                action.apply(blocking);
            } finally {
                blocking.countDown();
            }
        }

        Result fetch(CompletableFuture<Result> future) throws Exception {
            return trying(future).get();
        }

        Value<Result> trying(CompletableFuture<Result> future) {
            return () -> future.get(100, MILLISECONDS);
        }

        interface Action<T> {
            void apply(T t) throws Exception;
        }

        interface Value<T> {
            T get() throws Exception;
        }
    }

    @Nested
    class SyncAnyOfCompletableFutureTest extends AnyOfCompletableFutureContractTest {
        @Override
        protected void assertNoValidResult(Value<Result> result) throws Exception {
            assertThat(result.get(), is(nullValue()));
        }

        @Override
        protected void assertBlockedBeforeValidResult(Value<Result> result) {
            assertThrows(TimeoutException.class, result::get);
        }

        @Override
        protected void assertFoundResultWithFixedResults(Set<Result> results) {
            assertThat(results, hasSize(1));
        }

        @Test
        void stopsWhenTheFirstValidResultHasBeenFound() throws Throwable {
            assertThat(fetch(anyOf(VALID, FAILED)), is(VALID));
        }

        public CompletableFuture<Result> anyOf(Result... results) {
            return anyOf(Instruction::async, results);
        }

        private CompletableFuture<Result> anyOf(Function<Instruction, CompletableFuture<Result>> mapping, Result... results) {
            return anyOf(instructions(results), mapping, Result::isValid);
        }

        private <T, R> CompletableFuture<R> anyOf(List<T> items
                , Function<T, CompletableFuture<R>> promiseMapping
                , Predicate<R> condition) {

            CompletableFuture<R> result = CompletableFuture.completedFuture(null);
            for (T it : items) {
                result = result.thenCombine(promiseMapping.apply(it), (own, other) -> {
                    if (own != null && condition.test(own)) {
                        return own;
                    }
                    return condition.test(other) ? other : null;
                });
            }
            return result;
        }
    }

    @Nested
    class AsyncAnyOfCompletableFutureTest extends AnyOfCompletableFutureContractTest {
        @Override
        protected void assertNoValidResult(Value<Result> result) throws Exception {
            assertThrows(TimeoutException.class, result::get);
        }

        @Override
        protected void assertBlockedBeforeValidResult(Value<Result> result) throws Exception {
            assertThat(result.get(), is(VALID));
        }

        @Override
        protected void assertFoundResultWithFixedResults(Set<Result> results) {
            assertThat(results, hasSize(greaterThan(1)));
        }

        public CompletableFuture<Result> anyOf(Result... results) {
            Function<Instruction, CompletableFuture<Result>> mapping = (instruction) -> supplyAsync(() -> {
                instruction.result.isValid(); return instruction.result;
            });
            return anyOf(instructions(results), mapping, Result::isValid);
        }

        private <T, R> CompletableFuture<R> anyOf(List<T> items
                , Function<T, CompletableFuture<R>> promiseMapping
                , Predicate<R> condition) {
            return items.parallelStream()
                    .map(promiseMapping)
                    .map(completeWhenMatching(condition))
                    .reduce(doneWhenAnyCompleted())
                    .orElse(completedFuture(null));
        }

        private <R> Function<CompletableFuture<R>, CompletableFuture<R>> completeWhenMatching(Predicate<R> condition) {
            return origin -> origin.thenCompose(result -> {
                CompletableFuture<R> it = new CompletableFuture<>();
                if (condition.test(result)) {
                    it.complete(result);
                }
                return it;
            });
        }

        private <R> BinaryOperator<CompletableFuture<R>> doneWhenAnyCompleted() {
            return (main, other) -> main.applyToEither(other, Function.identity());
        }
    }

    private static List<Instruction> instructions(Result... results) {
        return stream(results).map(Instruction::new).collect(toList());
    }

    public interface Result {
        boolean isValid();

        static Result valid(boolean valid) {
            return new Result() {
                @Override
                public boolean isValid() {
                    return valid;
                }

                @Override
                public String toString() {
                    return valid ? "valid" : "invalid";
                }
            };
        }

        static Result failed(RuntimeException error) {
            return () -> {
                throw Objects.requireNonNull(error);
            };
        }

        default Result blocked(CountDownLatch blocker) {
            Result result = this;
            return new Result() {
                @Override
                public boolean isValid() {
                    try {
                        blocker.await();
                        return result.isValid();
                    } catch (InterruptedException ex) {
                        throw new RuntimeException(ex);
                    }
                }

                @Override
                public String toString() {
                    return result.toString() + " blocked";
                }
            };
        }
    }

    private static class Instruction {
        private Result result;

        public Instruction(Result result) {
            this.result = result;
        }

        public CompletableFuture<Result> async() {
            return supplyAsync(() -> this.result);
        }
    }
}