如何删除使用可完成未来的递归
How to remove this recursion that uses completable future
我最初编写这段代码是因为这就是我看待世界的方式,但我想研究如何删除递归。
我的想法是我有一个要呈现的指令列表。渲染器仅通过一条指令就为我提供了一个完整的未来结果。结果有时可能无效,所以我想在下一条指令上调用渲染器。我想要 return 呈现有效结果的第一条指令。
想请三位大佬看看,有空可以给点意见
/**
* I want a a completable future for a list of instructions, returning
* me the first valid result for the instruction input.
*/
private CompletableFuture<Result> getFutureResult(final List<Instruction> instructions) {
final CompletableFuture<Result> futureResult = new CompletableFuture<>();
renderAux(futureResult, instructions.iterator());
return futureResult;
}
/**
* This is actually an external method that will return a completable
* future for only one instruction. I cannot modify this API.
*/
private CompletableFuture<Result> render(final Instruction instruction) {
/.../
}
/**
* This method is called recursively for the current instruction in the
* the iterator, if the result if not valid, we call on the next instruction.
*/
private void renderAux(
final CompletableFuture<Result> futureResult,
final Iterator<Instruction> instructionIterator) {
final Instruction instruction = instructionIterator.next();
final boolean isLast = !instructionIterator.hasNext();
render(instruction).whenComplete((result, e) -> {
if (result.isValid() || isLast) {
// Complete the future on the first valid result, or
// if this is the last instruction.
futureResult.complete(result);
} else {
// Recursive call for the next instruction.
renderAux(futureResult, instructionIterator);
}
});
}
您可以直接使用 CompletableFuture.handle()
方法将期货链接在一起:
private CompletableFuture<Result> getFutureResult(final List<Instruction> instructions) {
CompletableFuture<Result> futureResult = null;
for (Instruction instruction : instructions) {
if (futureResult == null) {
futureResult = render(instruction);
} else {
futureResult = futureResult.handle((result, e) -> {
if (result.isValid())
return result;
else
return render(instruction).join();
});
}
}
return futureResult;
}
这会为列表中的每条指令创建一个额外的未来,但仍然不会执行不必要的指令。
你正在做的事情就像 CompletableFuture.anyOf but completing the future with your own condition so I called my method as anyOf
.I have implemented two versions of anyOf
, one matching the result in sequence, and another matching the result in parallel.After implementing my own anyOf
method in deep I found the interesting thing is CompletableFuture like as ES2015 Promise/jQuery Deferred。
用法
private CompletableFuture<Result> getFutureResult(List<Instruction> instructions) {
return anyOf(instructions,this::render,Result::isValid);
}
如果你想按顺序从指令中获取有效结果,那么你可以使用如下同步方法:
the sequence algorithm implemented by CompletableFuture.thenCombine method.
private <T, R> CompletableFuture<R> anyOf(List<T> items
, Function<T, CompletableFuture<R>> promiseMapping
, Predicate<R> condition) {
CompletableFuture<R> result = CompletableFuture.completedFuture(null);
for (T it : items) {
result = result.thenCombine(promiseMapping.apply(it), (own, other) -> {
if (own != null && condition.test(own)) {
return own;
}
return condition.test(other) ? other : null;
});
}
return result;
}
如果你想从指令中并行获取一个有效结果,而不管返回的是哪个有效结果,那么你可以使用下面的非同步方法,更多细节参见测试 部分:
the parallel algorithm implemented by CompletableFuture.thenCompose method.
private <T, R> CompletableFuture<R> anyOf(List<T> items
, Function<T, CompletableFuture<R>> promiseMapping
, Predicate<R> condition) {
return items.parallelStream()
.map(promiseMapping)
.map(completeWhenMatching(condition))
.reduce(doneWhenAnyCompleted())
.orElse(completedFuture(null));
}
测试
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.*;
import java.util.stream.Stream;
import static java.util.Arrays.stream;
import static java.util.concurrent.CompletableFuture.*;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class AnyOfCompletableFutureTest {
private static final RuntimeException CHECKING_ERROR = new RuntimeException("It can't be checked!");
private static final Result VALID = Result.valid(true);
private static final Result INVALID = Result.valid(false);
private static final Result FAILED = Result.failed(CHECKING_ERROR);
abstract static class AnyOfCompletableFutureContractTest {
@Test
void timeout() throws Throwable {
blocking(blocker -> {
Result blocked = VALID.blocked(blocker);
CompletableFuture<Result> it = anyOf(blocked);
assertThrows(TimeoutException.class, () -> fetch(it));
blocker.countDown();
assertThat(fetch(it), is(sameInstance(blocked)));
});
}
@Test
void returnsResultWhichIsValid() throws Throwable {
assertThat(fetch(anyOf(VALID)), is(VALID));
assertThat(fetch(anyOf(INVALID, VALID)), is(VALID));
}
@Test
void returnsNullWithEmptyResults() throws Throwable {
assertThat(fetch(anyOf(new Result[0])), is(nullValue()));
}
@Test
void reportingErrorsWhenAnyOfFuturesFailed() throws Throwable {
Throwable error = assertThrows(ExecutionException.class, () -> fetch(anyOf(FAILED)));
assertThat(error.getCause(), is(sameInstance(CHECKING_ERROR)));
}
@Test
void noValidResult() throws Throwable {
assertNoValidResult(trying(anyOf(INVALID)));
}
@Test
void containingOthersBlockedOnChecking() throws Throwable {
blocking((blocker) -> {/**/
assertBlockedBeforeValidResult(trying(anyOf(FAILED.blocked(blocker), VALID)));
});
}
@Test
void findingStrategy() throws Throwable {
any[] times = new any[20];
Result[] results = random(times.length * 3);
Set<Result> founds = new HashSet<>();
for (any of : times) founds.add(fetch(anyOf(results)));
assertFoundResultWithFixedResults(founds);
}
protected abstract void assertFoundResultWithFixedResults(Set<Result> results);
protected abstract void assertNoValidResult(Value<Result> result) throws Exception;
protected abstract void assertBlockedBeforeValidResult(Value<Result> result) throws Exception;
public abstract CompletableFuture<Result> anyOf(Result... results);
private Result[] random(int maxSize) {
Random random = new Random();
Result[] results = Stream.generate(() -> Result.valid(random.nextBoolean())).limit(maxSize).toArray(Result[]::new);
results[results.length - 1] = VALID;
return results;
}
class any {
}
private void blocking(Action<CountDownLatch> action) throws Exception {
CountDownLatch blocking = new CountDownLatch(1);
try {
action.apply(blocking);
} finally {
blocking.countDown();
}
}
Result fetch(CompletableFuture<Result> future) throws Exception {
return trying(future).get();
}
Value<Result> trying(CompletableFuture<Result> future) {
return () -> future.get(100, MILLISECONDS);
}
interface Action<T> {
void apply(T t) throws Exception;
}
interface Value<T> {
T get() throws Exception;
}
}
@Nested
class SyncAnyOfCompletableFutureTest extends AnyOfCompletableFutureContractTest {
@Override
protected void assertNoValidResult(Value<Result> result) throws Exception {
assertThat(result.get(), is(nullValue()));
}
@Override
protected void assertBlockedBeforeValidResult(Value<Result> result) {
assertThrows(TimeoutException.class, result::get);
}
@Override
protected void assertFoundResultWithFixedResults(Set<Result> results) {
assertThat(results, hasSize(1));
}
@Test
void stopsWhenTheFirstValidResultHasBeenFound() throws Throwable {
assertThat(fetch(anyOf(VALID, FAILED)), is(VALID));
}
public CompletableFuture<Result> anyOf(Result... results) {
return anyOf(Instruction::async, results);
}
private CompletableFuture<Result> anyOf(Function<Instruction, CompletableFuture<Result>> mapping, Result... results) {
return anyOf(instructions(results), mapping, Result::isValid);
}
private <T, R> CompletableFuture<R> anyOf(List<T> items
, Function<T, CompletableFuture<R>> promiseMapping
, Predicate<R> condition) {
CompletableFuture<R> result = CompletableFuture.completedFuture(null);
for (T it : items) {
result = result.thenCombine(promiseMapping.apply(it), (own, other) -> {
if (own != null && condition.test(own)) {
return own;
}
return condition.test(other) ? other : null;
});
}
return result;
}
}
@Nested
class AsyncAnyOfCompletableFutureTest extends AnyOfCompletableFutureContractTest {
@Override
protected void assertNoValidResult(Value<Result> result) throws Exception {
assertThrows(TimeoutException.class, result::get);
}
@Override
protected void assertBlockedBeforeValidResult(Value<Result> result) throws Exception {
assertThat(result.get(), is(VALID));
}
@Override
protected void assertFoundResultWithFixedResults(Set<Result> results) {
assertThat(results, hasSize(greaterThan(1)));
}
public CompletableFuture<Result> anyOf(Result... results) {
Function<Instruction, CompletableFuture<Result>> mapping = (instruction) -> supplyAsync(() -> {
instruction.result.isValid(); return instruction.result;
});
return anyOf(instructions(results), mapping, Result::isValid);
}
private <T, R> CompletableFuture<R> anyOf(List<T> items
, Function<T, CompletableFuture<R>> promiseMapping
, Predicate<R> condition) {
return items.parallelStream()
.map(promiseMapping)
.map(completeWhenMatching(condition))
.reduce(doneWhenAnyCompleted())
.orElse(completedFuture(null));
}
private <R> Function<CompletableFuture<R>, CompletableFuture<R>> completeWhenMatching(Predicate<R> condition) {
return origin -> origin.thenCompose(result -> {
CompletableFuture<R> it = new CompletableFuture<>();
if (condition.test(result)) {
it.complete(result);
}
return it;
});
}
private <R> BinaryOperator<CompletableFuture<R>> doneWhenAnyCompleted() {
return (main, other) -> main.applyToEither(other, Function.identity());
}
}
private static List<Instruction> instructions(Result... results) {
return stream(results).map(Instruction::new).collect(toList());
}
public interface Result {
boolean isValid();
static Result valid(boolean valid) {
return new Result() {
@Override
public boolean isValid() {
return valid;
}
@Override
public String toString() {
return valid ? "valid" : "invalid";
}
};
}
static Result failed(RuntimeException error) {
return () -> {
throw Objects.requireNonNull(error);
};
}
default Result blocked(CountDownLatch blocker) {
Result result = this;
return new Result() {
@Override
public boolean isValid() {
try {
blocker.await();
return result.isValid();
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
@Override
public String toString() {
return result.toString() + " blocked";
}
};
}
}
private static class Instruction {
private Result result;
public Instruction(Result result) {
this.result = result;
}
public CompletableFuture<Result> async() {
return supplyAsync(() -> this.result);
}
}
}
我最初编写这段代码是因为这就是我看待世界的方式,但我想研究如何删除递归。
我的想法是我有一个要呈现的指令列表。渲染器仅通过一条指令就为我提供了一个完整的未来结果。结果有时可能无效,所以我想在下一条指令上调用渲染器。我想要 return 呈现有效结果的第一条指令。
想请三位大佬看看,有空可以给点意见
/**
* I want a a completable future for a list of instructions, returning
* me the first valid result for the instruction input.
*/
private CompletableFuture<Result> getFutureResult(final List<Instruction> instructions) {
final CompletableFuture<Result> futureResult = new CompletableFuture<>();
renderAux(futureResult, instructions.iterator());
return futureResult;
}
/**
* This is actually an external method that will return a completable
* future for only one instruction. I cannot modify this API.
*/
private CompletableFuture<Result> render(final Instruction instruction) {
/.../
}
/**
* This method is called recursively for the current instruction in the
* the iterator, if the result if not valid, we call on the next instruction.
*/
private void renderAux(
final CompletableFuture<Result> futureResult,
final Iterator<Instruction> instructionIterator) {
final Instruction instruction = instructionIterator.next();
final boolean isLast = !instructionIterator.hasNext();
render(instruction).whenComplete((result, e) -> {
if (result.isValid() || isLast) {
// Complete the future on the first valid result, or
// if this is the last instruction.
futureResult.complete(result);
} else {
// Recursive call for the next instruction.
renderAux(futureResult, instructionIterator);
}
});
}
您可以直接使用 CompletableFuture.handle()
方法将期货链接在一起:
private CompletableFuture<Result> getFutureResult(final List<Instruction> instructions) {
CompletableFuture<Result> futureResult = null;
for (Instruction instruction : instructions) {
if (futureResult == null) {
futureResult = render(instruction);
} else {
futureResult = futureResult.handle((result, e) -> {
if (result.isValid())
return result;
else
return render(instruction).join();
});
}
}
return futureResult;
}
这会为列表中的每条指令创建一个额外的未来,但仍然不会执行不必要的指令。
你正在做的事情就像 CompletableFuture.anyOf but completing the future with your own condition so I called my method as anyOf
.I have implemented two versions of anyOf
, one matching the result in sequence, and another matching the result in parallel.After implementing my own anyOf
method in deep I found the interesting thing is CompletableFuture like as ES2015 Promise/jQuery Deferred。
用法
private CompletableFuture<Result> getFutureResult(List<Instruction> instructions) {
return anyOf(instructions,this::render,Result::isValid);
}
如果你想按顺序从指令中获取有效结果,那么你可以使用如下同步方法:
the sequence algorithm implemented by CompletableFuture.thenCombine method.
private <T, R> CompletableFuture<R> anyOf(List<T> items
, Function<T, CompletableFuture<R>> promiseMapping
, Predicate<R> condition) {
CompletableFuture<R> result = CompletableFuture.completedFuture(null);
for (T it : items) {
result = result.thenCombine(promiseMapping.apply(it), (own, other) -> {
if (own != null && condition.test(own)) {
return own;
}
return condition.test(other) ? other : null;
});
}
return result;
}
如果你想从指令中并行获取一个有效结果,而不管返回的是哪个有效结果,那么你可以使用下面的非同步方法,更多细节参见测试 部分:
the parallel algorithm implemented by CompletableFuture.thenCompose method.
private <T, R> CompletableFuture<R> anyOf(List<T> items
, Function<T, CompletableFuture<R>> promiseMapping
, Predicate<R> condition) {
return items.parallelStream()
.map(promiseMapping)
.map(completeWhenMatching(condition))
.reduce(doneWhenAnyCompleted())
.orElse(completedFuture(null));
}
测试
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.*;
import java.util.stream.Stream;
import static java.util.Arrays.stream;
import static java.util.concurrent.CompletableFuture.*;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class AnyOfCompletableFutureTest {
private static final RuntimeException CHECKING_ERROR = new RuntimeException("It can't be checked!");
private static final Result VALID = Result.valid(true);
private static final Result INVALID = Result.valid(false);
private static final Result FAILED = Result.failed(CHECKING_ERROR);
abstract static class AnyOfCompletableFutureContractTest {
@Test
void timeout() throws Throwable {
blocking(blocker -> {
Result blocked = VALID.blocked(blocker);
CompletableFuture<Result> it = anyOf(blocked);
assertThrows(TimeoutException.class, () -> fetch(it));
blocker.countDown();
assertThat(fetch(it), is(sameInstance(blocked)));
});
}
@Test
void returnsResultWhichIsValid() throws Throwable {
assertThat(fetch(anyOf(VALID)), is(VALID));
assertThat(fetch(anyOf(INVALID, VALID)), is(VALID));
}
@Test
void returnsNullWithEmptyResults() throws Throwable {
assertThat(fetch(anyOf(new Result[0])), is(nullValue()));
}
@Test
void reportingErrorsWhenAnyOfFuturesFailed() throws Throwable {
Throwable error = assertThrows(ExecutionException.class, () -> fetch(anyOf(FAILED)));
assertThat(error.getCause(), is(sameInstance(CHECKING_ERROR)));
}
@Test
void noValidResult() throws Throwable {
assertNoValidResult(trying(anyOf(INVALID)));
}
@Test
void containingOthersBlockedOnChecking() throws Throwable {
blocking((blocker) -> {/**/
assertBlockedBeforeValidResult(trying(anyOf(FAILED.blocked(blocker), VALID)));
});
}
@Test
void findingStrategy() throws Throwable {
any[] times = new any[20];
Result[] results = random(times.length * 3);
Set<Result> founds = new HashSet<>();
for (any of : times) founds.add(fetch(anyOf(results)));
assertFoundResultWithFixedResults(founds);
}
protected abstract void assertFoundResultWithFixedResults(Set<Result> results);
protected abstract void assertNoValidResult(Value<Result> result) throws Exception;
protected abstract void assertBlockedBeforeValidResult(Value<Result> result) throws Exception;
public abstract CompletableFuture<Result> anyOf(Result... results);
private Result[] random(int maxSize) {
Random random = new Random();
Result[] results = Stream.generate(() -> Result.valid(random.nextBoolean())).limit(maxSize).toArray(Result[]::new);
results[results.length - 1] = VALID;
return results;
}
class any {
}
private void blocking(Action<CountDownLatch> action) throws Exception {
CountDownLatch blocking = new CountDownLatch(1);
try {
action.apply(blocking);
} finally {
blocking.countDown();
}
}
Result fetch(CompletableFuture<Result> future) throws Exception {
return trying(future).get();
}
Value<Result> trying(CompletableFuture<Result> future) {
return () -> future.get(100, MILLISECONDS);
}
interface Action<T> {
void apply(T t) throws Exception;
}
interface Value<T> {
T get() throws Exception;
}
}
@Nested
class SyncAnyOfCompletableFutureTest extends AnyOfCompletableFutureContractTest {
@Override
protected void assertNoValidResult(Value<Result> result) throws Exception {
assertThat(result.get(), is(nullValue()));
}
@Override
protected void assertBlockedBeforeValidResult(Value<Result> result) {
assertThrows(TimeoutException.class, result::get);
}
@Override
protected void assertFoundResultWithFixedResults(Set<Result> results) {
assertThat(results, hasSize(1));
}
@Test
void stopsWhenTheFirstValidResultHasBeenFound() throws Throwable {
assertThat(fetch(anyOf(VALID, FAILED)), is(VALID));
}
public CompletableFuture<Result> anyOf(Result... results) {
return anyOf(Instruction::async, results);
}
private CompletableFuture<Result> anyOf(Function<Instruction, CompletableFuture<Result>> mapping, Result... results) {
return anyOf(instructions(results), mapping, Result::isValid);
}
private <T, R> CompletableFuture<R> anyOf(List<T> items
, Function<T, CompletableFuture<R>> promiseMapping
, Predicate<R> condition) {
CompletableFuture<R> result = CompletableFuture.completedFuture(null);
for (T it : items) {
result = result.thenCombine(promiseMapping.apply(it), (own, other) -> {
if (own != null && condition.test(own)) {
return own;
}
return condition.test(other) ? other : null;
});
}
return result;
}
}
@Nested
class AsyncAnyOfCompletableFutureTest extends AnyOfCompletableFutureContractTest {
@Override
protected void assertNoValidResult(Value<Result> result) throws Exception {
assertThrows(TimeoutException.class, result::get);
}
@Override
protected void assertBlockedBeforeValidResult(Value<Result> result) throws Exception {
assertThat(result.get(), is(VALID));
}
@Override
protected void assertFoundResultWithFixedResults(Set<Result> results) {
assertThat(results, hasSize(greaterThan(1)));
}
public CompletableFuture<Result> anyOf(Result... results) {
Function<Instruction, CompletableFuture<Result>> mapping = (instruction) -> supplyAsync(() -> {
instruction.result.isValid(); return instruction.result;
});
return anyOf(instructions(results), mapping, Result::isValid);
}
private <T, R> CompletableFuture<R> anyOf(List<T> items
, Function<T, CompletableFuture<R>> promiseMapping
, Predicate<R> condition) {
return items.parallelStream()
.map(promiseMapping)
.map(completeWhenMatching(condition))
.reduce(doneWhenAnyCompleted())
.orElse(completedFuture(null));
}
private <R> Function<CompletableFuture<R>, CompletableFuture<R>> completeWhenMatching(Predicate<R> condition) {
return origin -> origin.thenCompose(result -> {
CompletableFuture<R> it = new CompletableFuture<>();
if (condition.test(result)) {
it.complete(result);
}
return it;
});
}
private <R> BinaryOperator<CompletableFuture<R>> doneWhenAnyCompleted() {
return (main, other) -> main.applyToEither(other, Function.identity());
}
}
private static List<Instruction> instructions(Result... results) {
return stream(results).map(Instruction::new).collect(toList());
}
public interface Result {
boolean isValid();
static Result valid(boolean valid) {
return new Result() {
@Override
public boolean isValid() {
return valid;
}
@Override
public String toString() {
return valid ? "valid" : "invalid";
}
};
}
static Result failed(RuntimeException error) {
return () -> {
throw Objects.requireNonNull(error);
};
}
default Result blocked(CountDownLatch blocker) {
Result result = this;
return new Result() {
@Override
public boolean isValid() {
try {
blocker.await();
return result.isValid();
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
@Override
public String toString() {
return result.toString() + " blocked";
}
};
}
}
private static class Instruction {
private Result result;
public Instruction(Result result) {
this.result = result;
}
public CompletableFuture<Result> async() {
return supplyAsync(() -> this.result);
}
}
}