如何在不冒 StackOverflowError 风险的情况下使用 CompletableFuture?
How to use CompletableFuture without risking a StackOverflowError?
我想遍历异步函数的搜索 space。我将逻辑编码如下:
/**
* Assuming that a function maps a range of inputs to the same output value, minimizes the input value while
* maintaining the output value.
*
* @param previousInput the last input known to return {@code target}
* @param currentInput the new input value to evaluate
* @param function maps an input to an output value
* @param target the expected output value
* @return the minimum input value that results in the {@code target} output value
* <br>{@code @throws NullPointerException} if any argument is null
* <br>{@code @throws IllegalArgumentException} if {@code stepSize} is zero}
*/
private static CompletionStage<BigDecimal> optimizeInput(BigDecimal previousInput,
BigDecimal currentInput,
BigDecimal stepSize,
Function<BigDecimal, CompletionStage<BigDecimal>> function,
BigDecimal target)
{
return function.apply(currentInput).thenCompose(output ->
{
assertThat("stepSize", stepSize).isNotZero();
int outputMinusTarget = output.compareTo(target);
if (outputMinusTarget != 0)
return CompletableFuture.completedFuture(previousInput);
BigDecimal nextInput = currentInput.add(stepSize);
if (nextInput.compareTo(BigDecimal.ZERO) < 0)
return CompletableFuture.completedFuture(previousInput);
return optimizeInput(currentInput, nextInput, stepSize, function, target);
});
}
不幸的是,如果函数有大量搜索 space 这会在一些迭代后引发 WhosebugError。是否可以使用固定大小的堆栈迭代搜索 space?
你有以下递归结构
CompletableFuture<T> compute(...) {
return asyncTask().thenCompose(t -> {
if (...)
return completedFuture(t);
} else {
return compute(...);
}
}
}
您可以重写它以避免可完成的未来组合及其在完成期间的堆栈使用。
CompletableFuture<T> compute(...) {
CompletableFuture<T> result = new CompletableFuture<>();
computeHelper(result, ...);
return result;
}
void computeHelper(CompletableFuture<T> result, ...) {
asyncTask().thenAccept(t -> {
if (...) {
result.complete(t);
} else {
computeHelper(result, ...);
}
});
}
如果 asyncTask()
不是真正的异步并且只使用当前线程,您必须将 thenAccept
替换为它的异步版本之一以使用执行程序任务队列而不是线程堆栈。
dfogni 的回答应该可以正常工作——但为了完整起见,在使用 trampolining 类型技术同步方法的情况下,可以避免执行程序切换。
为了方便起见,我引入了一个 class 来捕获在迭代之间变化的状态,并引入了实现完成检查和生成下一个状态的方法。我相信这与您的原始逻辑相同,但您可以进行三次检查。
private static CompletionStage<BigDecimal> optimizeInput(BigDecimal previousInput,
BigDecimal currentInput,
BigDecimal stepSize,
Function<BigDecimal, CompletionStage<BigDecimal>> function,
BigDecimal target) {
class State {
BigDecimal prev;
BigDecimal curr;
BigDecimal output;
State(BigDecimal prev, BigDecimal curr, BigDecimal output) {
this.prev = prev;
this.curr = curr;
this.output = output;
}
boolean shouldContinue() {
return output.compareTo(target) == 0 && curr.add(stepSize).compareTo(BigDecimal.ZERO) >= 0;
}
CompletionStage<State> next() {
BigDecimal nextInput = curr.add(stepSize);
return function.apply(nextInput).thenApply(nextOutput -> new State(curr, nextInput, nextOutput));
}
}
/* Now it gets complicated... we have to check if we're running on the same thread we were called on. If we
* were, instead of recursively calling `next()`, we'll use PassBack to pass our new state back
* to the stack that called us.
*/
class Passback {
State state = null;
boolean isRunning = true;
State poll() {
final State c = this.state;
this.state = null;
return c;
}
}
class InputOptimizer extends CompletableFuture<BigDecimal> {
void optimize(State state, final Thread previousThread, final Passback previousPassback) {
final Thread currentThread = Thread.currentThread();
if (currentThread.equals(previousThread) && previousPassback.isRunning) {
// this is a recursive call, our caller will run it
previousPassback.state = state;
} else {
Passback passback = new Passback();
State curr = state;
do {
if (curr.shouldContinue()) {
curr.next().thenAccept(next -> optimize(next, currentThread, passback));
} else {
complete(curr.prev);
return;
}
// loop as long as we're making synchronous recursive calls
} while ((curr = passback.poll()) != null);
passback.isRunning = false;
}
}
}
InputOptimizer ret = new InputOptimizer();
function.apply(currentInput)
.thenAccept(output -> ret.optimize(
new State(previousInput, currentInput, output),
null, null));
return ret;
}
好吧,这很复杂。另外,请注意,这要求您的函数永远不会抛出异常或异常完成,这可能会出现问题。您可以对此进行泛化,这样您只需编写一次(使用正确的异常处理),可以在 asyncutil library 中找到(免责声明: 我是合著者这个图书馆)。可能还有其他具有类似功能的库,最有可能是像 Rx 这样成熟的响应式库。使用 asyncutil,
private static CompletionStage<BigDecimal> optimizeInput(BigDecimal previousInput,
BigDecimal currentInput,
BigDecimal stepSize,
Function<BigDecimal, CompletionStage<BigDecimal>> function,
BigDecimal target) {
// ... State class from before
return function
.apply(currentInput)
.thenCompose(output -> AsyncTrampoline.asyncWhile(
State::shouldContinue,
State::next,
new State(previousInput, currentInput, output)))
.thenApply(state -> state.prev);
}
我想遍历异步函数的搜索 space。我将逻辑编码如下:
/**
* Assuming that a function maps a range of inputs to the same output value, minimizes the input value while
* maintaining the output value.
*
* @param previousInput the last input known to return {@code target}
* @param currentInput the new input value to evaluate
* @param function maps an input to an output value
* @param target the expected output value
* @return the minimum input value that results in the {@code target} output value
* <br>{@code @throws NullPointerException} if any argument is null
* <br>{@code @throws IllegalArgumentException} if {@code stepSize} is zero}
*/
private static CompletionStage<BigDecimal> optimizeInput(BigDecimal previousInput,
BigDecimal currentInput,
BigDecimal stepSize,
Function<BigDecimal, CompletionStage<BigDecimal>> function,
BigDecimal target)
{
return function.apply(currentInput).thenCompose(output ->
{
assertThat("stepSize", stepSize).isNotZero();
int outputMinusTarget = output.compareTo(target);
if (outputMinusTarget != 0)
return CompletableFuture.completedFuture(previousInput);
BigDecimal nextInput = currentInput.add(stepSize);
if (nextInput.compareTo(BigDecimal.ZERO) < 0)
return CompletableFuture.completedFuture(previousInput);
return optimizeInput(currentInput, nextInput, stepSize, function, target);
});
}
不幸的是,如果函数有大量搜索 space 这会在一些迭代后引发 WhosebugError。是否可以使用固定大小的堆栈迭代搜索 space?
你有以下递归结构
CompletableFuture<T> compute(...) {
return asyncTask().thenCompose(t -> {
if (...)
return completedFuture(t);
} else {
return compute(...);
}
}
}
您可以重写它以避免可完成的未来组合及其在完成期间的堆栈使用。
CompletableFuture<T> compute(...) {
CompletableFuture<T> result = new CompletableFuture<>();
computeHelper(result, ...);
return result;
}
void computeHelper(CompletableFuture<T> result, ...) {
asyncTask().thenAccept(t -> {
if (...) {
result.complete(t);
} else {
computeHelper(result, ...);
}
});
}
如果 asyncTask()
不是真正的异步并且只使用当前线程,您必须将 thenAccept
替换为它的异步版本之一以使用执行程序任务队列而不是线程堆栈。
dfogni 的回答应该可以正常工作——但为了完整起见,在使用 trampolining 类型技术同步方法的情况下,可以避免执行程序切换。
为了方便起见,我引入了一个 class 来捕获在迭代之间变化的状态,并引入了实现完成检查和生成下一个状态的方法。我相信这与您的原始逻辑相同,但您可以进行三次检查。
private static CompletionStage<BigDecimal> optimizeInput(BigDecimal previousInput,
BigDecimal currentInput,
BigDecimal stepSize,
Function<BigDecimal, CompletionStage<BigDecimal>> function,
BigDecimal target) {
class State {
BigDecimal prev;
BigDecimal curr;
BigDecimal output;
State(BigDecimal prev, BigDecimal curr, BigDecimal output) {
this.prev = prev;
this.curr = curr;
this.output = output;
}
boolean shouldContinue() {
return output.compareTo(target) == 0 && curr.add(stepSize).compareTo(BigDecimal.ZERO) >= 0;
}
CompletionStage<State> next() {
BigDecimal nextInput = curr.add(stepSize);
return function.apply(nextInput).thenApply(nextOutput -> new State(curr, nextInput, nextOutput));
}
}
/* Now it gets complicated... we have to check if we're running on the same thread we were called on. If we
* were, instead of recursively calling `next()`, we'll use PassBack to pass our new state back
* to the stack that called us.
*/
class Passback {
State state = null;
boolean isRunning = true;
State poll() {
final State c = this.state;
this.state = null;
return c;
}
}
class InputOptimizer extends CompletableFuture<BigDecimal> {
void optimize(State state, final Thread previousThread, final Passback previousPassback) {
final Thread currentThread = Thread.currentThread();
if (currentThread.equals(previousThread) && previousPassback.isRunning) {
// this is a recursive call, our caller will run it
previousPassback.state = state;
} else {
Passback passback = new Passback();
State curr = state;
do {
if (curr.shouldContinue()) {
curr.next().thenAccept(next -> optimize(next, currentThread, passback));
} else {
complete(curr.prev);
return;
}
// loop as long as we're making synchronous recursive calls
} while ((curr = passback.poll()) != null);
passback.isRunning = false;
}
}
}
InputOptimizer ret = new InputOptimizer();
function.apply(currentInput)
.thenAccept(output -> ret.optimize(
new State(previousInput, currentInput, output),
null, null));
return ret;
}
好吧,这很复杂。另外,请注意,这要求您的函数永远不会抛出异常或异常完成,这可能会出现问题。您可以对此进行泛化,这样您只需编写一次(使用正确的异常处理),可以在 asyncutil library 中找到(免责声明: 我是合著者这个图书馆)。可能还有其他具有类似功能的库,最有可能是像 Rx 这样成熟的响应式库。使用 asyncutil,
private static CompletionStage<BigDecimal> optimizeInput(BigDecimal previousInput,
BigDecimal currentInput,
BigDecimal stepSize,
Function<BigDecimal, CompletionStage<BigDecimal>> function,
BigDecimal target) {
// ... State class from before
return function
.apply(currentInput)
.thenCompose(output -> AsyncTrampoline.asyncWhile(
State::shouldContinue,
State::next,
new State(previousInput, currentInput, output)))
.thenApply(state -> state.prev);
}