CompletableFuture:等待完成百分比
CompletableFuture: Await percentage complete
我正在将相同的数据并行写入分布式系统的 n 个节点。
当这些节点中的 n% 已成功写入时,剩余的写入其他节点并不重要,因为 n% 保证了其他节点之间的复制。
Java 的 CompletableFuture 似乎非常接近我想要的,例如:
CompletableFuture.anyOf()
(Returns 当第一个 future 完成时) - 避免不必要的等待,但是 returns 太快了,因为我需要 n% 完成
CompletableFuture.allOf()
(Returns 当所有 futures 完成时) - 避免 return 过早但不必要地等待 100% 完成
我正在寻找一种方法来 return 当特定百分比的期货已经完成时。
例如,如果我提供 10 个期货,return 当其中 6 个或 60% 成功完成时。
例如,Bluebird JS与
具有此功能
Promise.some(promises, countThatNeedToComplete)
我想知道我是否可以在 java
中使用 TheadExecutor 或 vanilla CompletableFuture 做类似的事情
我相信您可以只使用 CompletableFuture
已经提供的内容来实现您想要的,但是您必须实施额外的控制才能知道有多少未来任务已经完成,以及当您达到 number/percentage您需要的,取消剩余的任务。
下面是一个class来说明这个想法:
public class CompletableSome<T>
{
private List<CompletableFuture<Void>> tasks;
private int tasksCompleted = 0;
public CompletableSome(List<CompletableFuture<T>> tasks, int percentOfTasksThatMustComplete)
{
int minTasksThatMustComplete = tasks.size() * percentOfTasksThatMustComplete / 100;
System.out.println(
String.format("Need to complete at least %s%% of the %s tasks provided, which means %s tasks.",
percentOfTasksThatMustComplete, tasks.size(), minTasksThatMustComplete));
this.tasks = new ArrayList<>(tasks.size());
for (CompletableFuture<?> task : tasks)
{
this.tasks.add(task.thenAccept(a -> {
// thenAccept will be called right after the future task is completed. At this point we'll
// check if we reached the minimum number of nodes needed. If we did, then complete the
// remaining tasks since they are no longer needed.
tasksCompleted++;
if (tasksCompleted >= minTasksThatMustComplete)
{
tasks.forEach(t -> t.complete(null));
}
}));
}
}
public void execute()
{
CompletableFuture.allOf(tasks.toArray(new CompletableFuture<?>[0])).join();
}
}
您将使用此 class,如下例所示:
public static void main(String[] args)
{
int numberOfNodes = 4;
// Create one future task for each node.
List<CompletableFuture<String>> nodes = new ArrayList<>();
for (int i = 1; i <= numberOfNodes; i++)
{
String nodeId = "result" + i;
nodes.add(CompletableFuture.supplyAsync(() -> {
try
{
// Sleep for some time to avoid all tasks to complete before the count is checked.
Thread.sleep(100 + new Random().nextInt(500));
}
catch (InterruptedException e)
{
e.printStackTrace();
}
// The action here is just to print the nodeId, you would make the actual call here.
System.out.println(nodeId + " completed.");
return nodeId;
}));
}
// Here we're saying that just 75% of the nodes must be called successfully.
CompletableSome<String> tasks = new CompletableSome<>(nodes, 75);
tasks.execute();
}
请注意,使用此解决方案,您最终可能会执行比最低要求更多的任务——例如,当两个或更多节点几乎同时响应时,您可能会在第一个节点响应时达到最低要求数量,但是没有时间取消其他任务。如果这是一个问题,那么您将不得不实施更多的控制。
我正在将相同的数据并行写入分布式系统的 n 个节点。
当这些节点中的 n% 已成功写入时,剩余的写入其他节点并不重要,因为 n% 保证了其他节点之间的复制。
Java 的 CompletableFuture 似乎非常接近我想要的,例如:
CompletableFuture.anyOf()
(Returns 当第一个 future 完成时) - 避免不必要的等待,但是 returns 太快了,因为我需要 n% 完成
CompletableFuture.allOf()
(Returns 当所有 futures 完成时) - 避免 return 过早但不必要地等待 100% 完成
我正在寻找一种方法来 return 当特定百分比的期货已经完成时。 例如,如果我提供 10 个期货,return 当其中 6 个或 60% 成功完成时。
例如,Bluebird JS与
具有此功能Promise.some(promises, countThatNeedToComplete)
我想知道我是否可以在 java
中使用 TheadExecutor 或 vanilla CompletableFuture 做类似的事情我相信您可以只使用 CompletableFuture
已经提供的内容来实现您想要的,但是您必须实施额外的控制才能知道有多少未来任务已经完成,以及当您达到 number/percentage您需要的,取消剩余的任务。
下面是一个class来说明这个想法:
public class CompletableSome<T>
{
private List<CompletableFuture<Void>> tasks;
private int tasksCompleted = 0;
public CompletableSome(List<CompletableFuture<T>> tasks, int percentOfTasksThatMustComplete)
{
int minTasksThatMustComplete = tasks.size() * percentOfTasksThatMustComplete / 100;
System.out.println(
String.format("Need to complete at least %s%% of the %s tasks provided, which means %s tasks.",
percentOfTasksThatMustComplete, tasks.size(), minTasksThatMustComplete));
this.tasks = new ArrayList<>(tasks.size());
for (CompletableFuture<?> task : tasks)
{
this.tasks.add(task.thenAccept(a -> {
// thenAccept will be called right after the future task is completed. At this point we'll
// check if we reached the minimum number of nodes needed. If we did, then complete the
// remaining tasks since they are no longer needed.
tasksCompleted++;
if (tasksCompleted >= minTasksThatMustComplete)
{
tasks.forEach(t -> t.complete(null));
}
}));
}
}
public void execute()
{
CompletableFuture.allOf(tasks.toArray(new CompletableFuture<?>[0])).join();
}
}
您将使用此 class,如下例所示:
public static void main(String[] args)
{
int numberOfNodes = 4;
// Create one future task for each node.
List<CompletableFuture<String>> nodes = new ArrayList<>();
for (int i = 1; i <= numberOfNodes; i++)
{
String nodeId = "result" + i;
nodes.add(CompletableFuture.supplyAsync(() -> {
try
{
// Sleep for some time to avoid all tasks to complete before the count is checked.
Thread.sleep(100 + new Random().nextInt(500));
}
catch (InterruptedException e)
{
e.printStackTrace();
}
// The action here is just to print the nodeId, you would make the actual call here.
System.out.println(nodeId + " completed.");
return nodeId;
}));
}
// Here we're saying that just 75% of the nodes must be called successfully.
CompletableSome<String> tasks = new CompletableSome<>(nodes, 75);
tasks.execute();
}
请注意,使用此解决方案,您最终可能会执行比最低要求更多的任务——例如,当两个或更多节点几乎同时响应时,您可能会在第一个节点响应时达到最低要求数量,但是没有时间取消其他任务。如果这是一个问题,那么您将不得不实施更多的控制。