在 CompletableFuture 上同步使用 acceptEither 进行任务调度
Task scheduling using acceptEither synchronously on CompletableFuture
我正在通过 CompletableFuture API 学习并发。假设我有两个任务:一个需要 250 毫秒,另一个需要 2500 毫秒。在以下代码中:
Supplier<List<Long>> supplyIds = () -> {
sleep(200);
return(Arrays.asList(1L, 2L, 3L));
};
Function<List<Long>, CompletableFuture<List<User>>> fetchUsers1 = idList -> {
sleep(250);
System.out.println("User2"+ Thread.currentThread().getName());
Supplier<List<User>> userSupplier = () -> idList.stream().map(User::new).collect(Collectors.toList());
return(CompletableFuture.supplyAsync(userSupplier));
};
Function<List<Long>, CompletableFuture<List<User>>> fetchUsers2 = idList -> {
sleep(2500);
System.out.println("User2"+ Thread.currentThread().getName());
Supplier<List<User>> userSupplier = () -> idList.stream().map(User::new).collect(Collectors.toList());
return(CompletableFuture.supplyAsync(userSupplier));
};
Consumer<List<User>> displayer = users -> {
users.forEach(System.out::println);
};
CompletableFuture<List<Long>> completableFuture = CompletableFuture.supplyAsync(supplyIds);
CompletableFuture<List<User>> users1 = completableFuture.thenCompose(fetchUsers1);
CompletableFuture<List<User>> users2 = completableFuture.thenCompose(fetchUsers2);
users1.thenRun(()-> System.out.println("User 1"));
users2.thenRun(()-> System.out.println("User 2"));
users1.acceptEither(users2, displayer);
sleep(6000);
我得到以下结果:
User2ForkJoinPool.commonPool-worker-1
User 2
1
2
3
User2ForkJoinPool.commonPool-worker-1
User 1
我知道代码是 运行 同步的,因为正在使用相同的公共分叉连接池线程,而我们没有指定线程。我很困惑为什么先执行fetchUsers2
任务然后执行fetchUsers1
任务(这似乎与每个运行一致)。我假设由于 thenCompose
在代码中首先在 fetchUsers1
上被调用,所以它首先是 'queued up'。
文档中没有任何内容说明调用顺序对 thenCompose
很重要。
由于您定义了两个 独立 阶段,两者都仅取决于 completableFuture
,因此 users1
和 user2
之间没有定义顺序,并且结果顺序仅取决于实现。
您可能会在一个环境中重复获得特定订单,但在不同环境中可能会获得不同订单。即使在您的环境中,也有可能在某些运行中获得不同的顺序。如果启动线程在调用 supplyAsync(supplyIds)
200 毫秒后丢失 CPU,它可能会在调用 thenCompose(fetchUsers2)
之前立即执行 thenCompose(fetchUsers1)
指定的操作。
当两个动作之间的顺序很重要时,您必须对它们之间的依赖关系建模。
注意同样的代码
users1.thenRun(()-> System.out.println("User 1"));
users2.thenRun(()-> System.out.println("User 2"));
users1.acceptEither(users2, displayer);
定义完全独立的动作。由于 acceptEither
应用于 users1
和 users2
,而不是 thenRun
调用返回的完成阶段,因此它不依赖于打印语句的完成。这三个动作可以按任何顺序执行。
我正在通过 CompletableFuture API 学习并发。假设我有两个任务:一个需要 250 毫秒,另一个需要 2500 毫秒。在以下代码中:
Supplier<List<Long>> supplyIds = () -> {
sleep(200);
return(Arrays.asList(1L, 2L, 3L));
};
Function<List<Long>, CompletableFuture<List<User>>> fetchUsers1 = idList -> {
sleep(250);
System.out.println("User2"+ Thread.currentThread().getName());
Supplier<List<User>> userSupplier = () -> idList.stream().map(User::new).collect(Collectors.toList());
return(CompletableFuture.supplyAsync(userSupplier));
};
Function<List<Long>, CompletableFuture<List<User>>> fetchUsers2 = idList -> {
sleep(2500);
System.out.println("User2"+ Thread.currentThread().getName());
Supplier<List<User>> userSupplier = () -> idList.stream().map(User::new).collect(Collectors.toList());
return(CompletableFuture.supplyAsync(userSupplier));
};
Consumer<List<User>> displayer = users -> {
users.forEach(System.out::println);
};
CompletableFuture<List<Long>> completableFuture = CompletableFuture.supplyAsync(supplyIds);
CompletableFuture<List<User>> users1 = completableFuture.thenCompose(fetchUsers1);
CompletableFuture<List<User>> users2 = completableFuture.thenCompose(fetchUsers2);
users1.thenRun(()-> System.out.println("User 1"));
users2.thenRun(()-> System.out.println("User 2"));
users1.acceptEither(users2, displayer);
sleep(6000);
我得到以下结果:
User2ForkJoinPool.commonPool-worker-1
User 2
1
2
3
User2ForkJoinPool.commonPool-worker-1
User 1
我知道代码是 运行 同步的,因为正在使用相同的公共分叉连接池线程,而我们没有指定线程。我很困惑为什么先执行fetchUsers2
任务然后执行fetchUsers1
任务(这似乎与每个运行一致)。我假设由于 thenCompose
在代码中首先在 fetchUsers1
上被调用,所以它首先是 'queued up'。
文档中没有任何内容说明调用顺序对 thenCompose
很重要。
由于您定义了两个 独立 阶段,两者都仅取决于 completableFuture
,因此 users1
和 user2
之间没有定义顺序,并且结果顺序仅取决于实现。
您可能会在一个环境中重复获得特定订单,但在不同环境中可能会获得不同订单。即使在您的环境中,也有可能在某些运行中获得不同的顺序。如果启动线程在调用 supplyAsync(supplyIds)
200 毫秒后丢失 CPU,它可能会在调用 thenCompose(fetchUsers2)
之前立即执行 thenCompose(fetchUsers1)
指定的操作。
当两个动作之间的顺序很重要时,您必须对它们之间的依赖关系建模。
注意同样的代码
users1.thenRun(()-> System.out.println("User 1"));
users2.thenRun(()-> System.out.println("User 2"));
users1.acceptEither(users2, displayer);
定义完全独立的动作。由于 acceptEither
应用于 users1
和 users2
,而不是 thenRun
调用返回的完成阶段,因此它不依赖于打印语句的完成。这三个动作可以按任何顺序执行。