CompletableFuture 从内部注入
CompletableFuture injection from the inside
是否可以从内部在CompletableFuture
链中注入一个CompletableFuture
?
我正在使用这样的函数:
public CompletableFuture<Boolean> getFutureOfMyLongRunningTask() {
CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
// ... Some processing here ...
if (somecondition failed)
return false; // Task failed!
return true; // OK
}).thenApplyAsync((Boolean result) -> {
if (!result) // check of previous stage fail
return false;
// ... Some processing here ...
if (!some condition satisfied) {
// This is where I want the injection to happen.
// This stage should be suspended and a new stage should be injected between this point and the next stage.
}
return true; // OK
}).thenApplyAsync((Boolean result) -> {
if (!result) // check of previous stage fail
return false;
// ... Some processing here ...
return true; // OK
});
// This is the result we have to wait for.
return future;
}
在注入点 if (!some condition satisfied)
,我想 运行 一个查询(比方说)需要 5 秒来执行并检索最后阶段所需的一些数据。我不想阻塞线程 5 秒,例如在 if
中使查询同步,我希望它 运行 异步,当结果返回时直接进入下一阶段。我遇到的问题是条件仅在 内部 链中已知。
有人对此有想法吗?
编辑
我会尽力澄清这个问题。我本来had一段代码而已。现在我正在尝试优化代码,以便生成更少的线程。
关键是在注入点我想发布类似的东西(抱歉,Datastax Java Cassandra 驱动程序 代码片段):
ResultSetFuture rsFuture = session.executeAsync(query);
并将那个未来注入链中。这将使调用线程 "free" 执行其他事情,而不是坐下来等待结果。
我不知道我能不能说得比这更清楚,但让我们按照这个例子。
我运行主线程中的一个循环:
for (int i = 0; i < 1000; i++) {
getFutureOfMyLongRunningTask(i);
}
此循环仅存在于主线程中,但每次调用函数 都会在线程池 P 中排队 一个新任务。现在假设 P 是一个大小为 1 的固定线程池。也就是说P中只有一个线程,只能处理1个任务。然而,主循环会将所有 1000 个任务排入队列。然后主循环将需要等待所有任务完成。
现在假设 1st 任务中的 1000 任务需要执行长时间的数据库查询。我们现在有两个选择:
查询在sync内部处理线程(属于线程池 P).这意味着我只需在 if (!some condition satisfied)
块内发出查询并等待结果。这有效地阻止 任务处理,因为线程池P 没有空闲 线程。唯一一个是 busy blocked on IO.
查询在async内部处理线程中执行(属于线程池 P)。这意味着我在 if (!some condition satisfied)
块内发出查询并立即返回我将收听的未来(可能 DB 驱动程序将产生另一个线程并阻塞 that 线程等待结果)。但是,属于 P 的线程现在 空闲 可以处理 至少 另一个任务。
在我看来,选项 2 优于选项 1,同样的推理也适用于大小为 > 的线程池1 或动态大小。
我只想让线程池尽可能空闲,以产生最少数量的线程,以避免浪费资源。
希望这是有道理的。如果不是,请你解释一下我哪里错了?
而不是使用 thenApplyAsync
,而是使用 thenCompose
或 thenComposeAsync
,这使得函数 return 成为 CompletableFuture<Foo>
而不是 Foo
.而不是 return true
如果 some condition
是 满足,你需要 return CompletableFuture.completedFuture(true)
.
public CompletableFuture<Boolean> getFutureOfMyLongRunningTask() {
CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
// ... Some processing here ...
if (somecondition failed)
return false; // Task failed!
return true; // OK
}).thenComposeAsync((Boolean result) -> {
if (!result) // check of previous stage fail
return CompletableFuture.completedFuture(false);
// ... Some processing here ...
if (!some condition satisfied) {
return runSomeOtherQuery()
}
return CompletableFuture.completedFuture(true); // OK
}).thenApplyAsync((Boolean result) -> {
if (!result) // check of previous stage fail
return false;
// ... Some processing here ...
return true; // OK
});
// This is the result we have to wait for.
return future;
}
public CompletableFuture<Boolean> runSomeOtherQuery() {
....
}
您似乎在考虑在链式阶段之间拆分工作(涉及“异步”)以某种方式神奇地为您的程序逻辑增加了并发性改进。
当你链接阶段时,你正在创建一个直接的、顺序的依赖,即使你使用其中一种“异步”方法,因为后续的依赖阶段的执行不会在前一个完成之前开始。所以这种链接增加了昂贵的线程跳跃的机会,即不同的线程执行下一阶段,但不会提高并发性,因为最多仍然会有一个线程处理你的一个阶段。实际上,同一个线程恰好执行所有阶段的仍然可能的情况,很可能是最快的执行。
有一种更简单、更自然的方式来表达依赖关系。只需在一段代码中一个接一个地编写动作。您仍然可以安排该代码块进行异步执行。所以如果你的出发点是
public CompletableFuture<Boolean> getFutureOfMyLongRunningTask() {
CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
// First stage processing here ...
if (somecondition failed)
return false; // Task failed!
return true; // OK
}).thenApplyAsync((Boolean result) -> {
if (!result) // check of previous stage fail
return false;
// Second stage processing here ...
if (!some condition satisfied) {
// This is where I want the injection to happen.
// This stage should be suspended and a new stage should be
// injected between this point and the next stage.
}
return true; // OK
}).thenApplyAsync((Boolean result) -> {
if (!result) // check of previous stage fail
return false;
// Third stage processing here ...
return true; // OK
});
// This is the result we have to wait for.
return future;
}
改成
public CompletableFuture<Boolean> getFutureOfMyLongRunningTask() {
CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
// First stage processing here ...
if (somecondition failed)
return false; // Task failed!
// Second stage processing here ...
if (!some condition satisfied) {
// alternative "injected" stage processing
if(injected stage failed)
return false;
}
// Third stage processing here ...
return true; // OK
});
// This is the result we have to wait for.
return future;
}
更短更清晰。您不必反复检查前一阶段是否成功。您仍然具有相同的并发性,但执行效率可能更高。
是否可以从内部在CompletableFuture
链中注入一个CompletableFuture
?
我正在使用这样的函数:
public CompletableFuture<Boolean> getFutureOfMyLongRunningTask() {
CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
// ... Some processing here ...
if (somecondition failed)
return false; // Task failed!
return true; // OK
}).thenApplyAsync((Boolean result) -> {
if (!result) // check of previous stage fail
return false;
// ... Some processing here ...
if (!some condition satisfied) {
// This is where I want the injection to happen.
// This stage should be suspended and a new stage should be injected between this point and the next stage.
}
return true; // OK
}).thenApplyAsync((Boolean result) -> {
if (!result) // check of previous stage fail
return false;
// ... Some processing here ...
return true; // OK
});
// This is the result we have to wait for.
return future;
}
在注入点 if (!some condition satisfied)
,我想 运行 一个查询(比方说)需要 5 秒来执行并检索最后阶段所需的一些数据。我不想阻塞线程 5 秒,例如在 if
中使查询同步,我希望它 运行 异步,当结果返回时直接进入下一阶段。我遇到的问题是条件仅在 内部 链中已知。
有人对此有想法吗?
编辑
我会尽力澄清这个问题。我本来had一段代码而已。现在我正在尝试优化代码,以便生成更少的线程。
关键是在注入点我想发布类似的东西(抱歉,Datastax Java Cassandra 驱动程序 代码片段):
ResultSetFuture rsFuture = session.executeAsync(query);
并将那个未来注入链中。这将使调用线程 "free" 执行其他事情,而不是坐下来等待结果。
我不知道我能不能说得比这更清楚,但让我们按照这个例子。
我运行主线程中的一个循环:
for (int i = 0; i < 1000; i++) {
getFutureOfMyLongRunningTask(i);
}
此循环仅存在于主线程中,但每次调用函数 都会在线程池 P 中排队 一个新任务。现在假设 P 是一个大小为 1 的固定线程池。也就是说P中只有一个线程,只能处理1个任务。然而,主循环会将所有 1000 个任务排入队列。然后主循环将需要等待所有任务完成。
现在假设 1st 任务中的 1000 任务需要执行长时间的数据库查询。我们现在有两个选择:
查询在sync内部处理线程(属于线程池 P).这意味着我只需在
if (!some condition satisfied)
块内发出查询并等待结果。这有效地阻止 任务处理,因为线程池P 没有空闲 线程。唯一一个是 busy blocked on IO.查询在async内部处理线程中执行(属于线程池 P)。这意味着我在
if (!some condition satisfied)
块内发出查询并立即返回我将收听的未来(可能 DB 驱动程序将产生另一个线程并阻塞 that 线程等待结果)。但是,属于 P 的线程现在 空闲 可以处理 至少 另一个任务。
在我看来,选项 2 优于选项 1,同样的推理也适用于大小为 > 的线程池1 或动态大小。
我只想让线程池尽可能空闲,以产生最少数量的线程,以避免浪费资源。
希望这是有道理的。如果不是,请你解释一下我哪里错了?
而不是使用 thenApplyAsync
,而是使用 thenCompose
或 thenComposeAsync
,这使得函数 return 成为 CompletableFuture<Foo>
而不是 Foo
.而不是 return true
如果 some condition
是 满足,你需要 return CompletableFuture.completedFuture(true)
.
public CompletableFuture<Boolean> getFutureOfMyLongRunningTask() {
CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
// ... Some processing here ...
if (somecondition failed)
return false; // Task failed!
return true; // OK
}).thenComposeAsync((Boolean result) -> {
if (!result) // check of previous stage fail
return CompletableFuture.completedFuture(false);
// ... Some processing here ...
if (!some condition satisfied) {
return runSomeOtherQuery()
}
return CompletableFuture.completedFuture(true); // OK
}).thenApplyAsync((Boolean result) -> {
if (!result) // check of previous stage fail
return false;
// ... Some processing here ...
return true; // OK
});
// This is the result we have to wait for.
return future;
}
public CompletableFuture<Boolean> runSomeOtherQuery() {
....
}
您似乎在考虑在链式阶段之间拆分工作(涉及“异步”)以某种方式神奇地为您的程序逻辑增加了并发性改进。
当你链接阶段时,你正在创建一个直接的、顺序的依赖,即使你使用其中一种“异步”方法,因为后续的依赖阶段的执行不会在前一个完成之前开始。所以这种链接增加了昂贵的线程跳跃的机会,即不同的线程执行下一阶段,但不会提高并发性,因为最多仍然会有一个线程处理你的一个阶段。实际上,同一个线程恰好执行所有阶段的仍然可能的情况,很可能是最快的执行。
有一种更简单、更自然的方式来表达依赖关系。只需在一段代码中一个接一个地编写动作。您仍然可以安排该代码块进行异步执行。所以如果你的出发点是
public CompletableFuture<Boolean> getFutureOfMyLongRunningTask() {
CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
// First stage processing here ...
if (somecondition failed)
return false; // Task failed!
return true; // OK
}).thenApplyAsync((Boolean result) -> {
if (!result) // check of previous stage fail
return false;
// Second stage processing here ...
if (!some condition satisfied) {
// This is where I want the injection to happen.
// This stage should be suspended and a new stage should be
// injected between this point and the next stage.
}
return true; // OK
}).thenApplyAsync((Boolean result) -> {
if (!result) // check of previous stage fail
return false;
// Third stage processing here ...
return true; // OK
});
// This is the result we have to wait for.
return future;
}
改成
public CompletableFuture<Boolean> getFutureOfMyLongRunningTask() {
CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
// First stage processing here ...
if (somecondition failed)
return false; // Task failed!
// Second stage processing here ...
if (!some condition satisfied) {
// alternative "injected" stage processing
if(injected stage failed)
return false;
}
// Third stage processing here ...
return true; // OK
});
// This is the result we have to wait for.
return future;
}
更短更清晰。您不必反复检查前一阶段是否成功。您仍然具有相同的并发性,但执行效率可能更高。