并行执行按顺序阻塞任务组
Executing in-parallel groups of in-order blocking tasks
我考虑这个问题已经有一段时间了,而且我正在学习越来越多关于线程、执行器等的知识。对executors和threads有了一个大概的了解,但是感觉有点卡
这是我正在尝试做的事情。
有命令,也有动作。
一个命令被命名并且可以被用户任意调用,例如!playsong、!cheer 等。
Action 是将工作发送给服务的东西;例如要求websocket客户端发送一条新消息,或者要求IRC客户端发送一条新消息等
执行命令时,它会按顺序依次执行其操作。
例如,!cheer 命令可能有四个操作:
- 发出一个websocket请求,并等待成功响应(例如:在OBS中显示一个场景项)
- 发送 IRC 消息(例如:发送聊天消息)。一旦发送,
- 等待 1-3 秒(例如:等待视频播放完毕)。等待完成后,
- 发出另一个 websocket 请求(例如:隐藏步骤 1 中的场景项)
这些不仅必须按顺序执行,而且我们也不能让它们全部同时开始(首先完成操作 1、2 和 4,然后最后完成操作 3);每个 Action 取决于先完成其前身。
最重要的是,客户端可以随时任意提交命令,并且不能相互阻塞。例如,!longcommand 可以启动但不会阻止 !shortcommand 启动(假设底层服务未被阻止)。
这是我想做的事情:
我知道我可以使用 Future/Callable 来阻止在给定线程上等待执行结果,因此每个 Action 都应该 return 一个 Future 当 运行 (Future 来自它的它使用的相应服务)。然后,我可以像这样在命令上以阻塞方式简单地逐一调用操作,以确保它们按顺序执行并且每个操作都等待另一个操作完成:
class ExecutableCommand implments Runnable {
// omitted for brevity
run() {
for(Action action:command.getActions()) {
action.run().get();
}
}
但是我将如何处理执行命令?
我想我会通过一个执行器提交每个命令,也许是像这样的 ThreadPoolExecutor,因为每个命令都被提交了?
class ExecutorServiceWrapper {
private final ExecutorService executorService = Executors.newThreadPoolExecutor(4);
void submit(ExecutableCommand command) {
executorService.submit(command)
}
}
然后每个客户端 ofc 将简单地保留对 ExecutorServiceWrapper 的引用并调用它以响应触发它们的事件:
class FromChatHandler() {
private final ExecutorServiceWrapper masterQueue;
onMessage(String message) {
Command command = // parse what command to lookup from message
masterQueue.submit(command)
}
}
@RestController // or whatever
class MyController() {
private final ExecutorServiceWrapper masterQueue;
@Post
executeCommandByName(String commandName) {
Command command = // lookup command
masterQueue.submit(command)
}
}
class directHandler() {
private final ExecutorServiceWrapper masterQueue;
handle(Command command) {
Command command = // build the command given the message
masterQueue.submit(command)
}
}
我假设因为每个命令都被提交给执行器,所以每个命令都会进入自己的线程,因此它不会阻塞其他线程。
但我不确定我是否应该像我一样使用 ExecutableCommand 执行上面的操作并执行命令中的每个操作。
另外,我不确定它是否会处理这种情况:
线程池固定为 5 个线程。
已执行 5 个命令。它们很长 运行ning 并使用不同的服务,但底层服务未被阻止并且仍然可以接受工作。
有人试图执行第 6 个命令 -- 他们不应该被阻止,因为底层服务仍然可以接受工作。
有没有更好的方法来做到这一点?我在正确的轨道上吗?
在这上面多花点时间后,我想出了一些使用执行器或期货的可能解决方案。不确定哪个比另一个更好,但因为我知道我可以扩展 ThreadPoolExecutor(比如,添加暂停功能)我可能会倾向于 Executors。
否则,如果有人有意见,随时欢迎!
我暂时将这两个解决方案保留在我的 GH 中 (),但我也会将它们放在下面。
https://github.com/TinaTiel/concurrency-learning
期货实施
package futures;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
public class CommandActionExample {
public static void main(String[] args) {
// Initialize some starting params
Random random = new Random();
int maxActions = 20;
int maxCommands = 5;
// Generate some commands, with a random number of actions.
// We'll use the indexes as the command and action names to keep it simple/readable
List<Command> commands = new ArrayList<>();
for(Integer c = 0; c < maxCommands; c++) {
Command command = new Command(String.format("%d", c+1));
for(Integer a = 0; a < random.nextInt(maxActions); a++) {
Action action = new Action(random, String.format("%d", a+1));
command.addAction(action);
}
commands.add(command);
}
// Print out the commands we'll execute, again to keep the results readable/understandable
System.out.println("Commands to execute: \n" + commands.stream().map(Command::toString).collect(Collectors.joining("\n")) + "\n");
// Build a Future that tries to execute all commands (supplied as Futures) in an arbitrary order
try {
CompletableFuture.allOf(commands.stream()
.map((Function<Command, CompletableFuture<Void>>) CompletableFuture::runAsync)
.collect(Collectors.toList())
.toArray(CompletableFuture[]::new)
).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
// commands.get(0).run(); // sanity check one of the command's actions run as expected
// When we execute the results, the actions should be executed in-order within a command at some point in the future
// (not started all at once), so something like:
// 0 Command-2:Action-1 scheduled at 34
// 0 Command-1:Action-1 scheduled at 21
// 0 Command-3:Action-1 scheduled at 4
// 4 Command-3:Action2 scheduled at ...
// 21 Command-1:Action-2 scheduled at ...
// 34 Command-1-Action-2 scheduled at ...
// ...
// Now how to test this...Maybe with JUnit inOrder.verify(...).run() ?
}
public static class Action implements Runnable {
private Command command;
private final Random random;
private final String name;
public Action(Random random, String name) {
this.random = random;
this.name = name;
}
public void setCommand(Command command) {
this.command = command;
}
@Override
public void run() {
// Simply sleep for a random period of time. This simulates pieces of work being done (network request, etc.)
long msTime = random.nextInt(1000);
System.out.println(new Timestamp(System.currentTimeMillis()) + ": Command-" + command.name + ":Action-" + name + " executing on Thread '" + Thread.currentThread().getName() + "' executing for " + msTime + "ms");
try {
Thread.sleep(msTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return "Action{" +
"name='" + name + '\'' +
'}';
}
}
public static class Command implements Runnable {
private final String name;
private final List<Action> actions = new ArrayList<>();
public Command(String name) {
this.name = name;
}
public void addAction(Action action) {
action.setCommand(this);
actions.add(action);
}
@Override
public void run() {
// If there are no actions, then do nothing
if(actions.isEmpty()) return;
// Build up a chain of futures.
// Looks like we have to build them up in reverse order, so start with the first action...
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(actions.remove(0));
// ...And then reverse the list and build the rest of the chain
// (yes we could execute backwards...but it's not common and I/others probably don't like to reason about it)
Collections.reverse(actions);
for(int i=0; i< actions.size(); i++) {
completableFuture.thenRun(actions.get(i));
}
// Execute our chain
try {
completableFuture.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return "Command{" +
"name='" + name + '\'' +
", actions=" + actions +
'}';
}
}
}
结果
输出和进度符合预期,但 Futures 似乎使用了 ForkJoinPool。
Commands to execute:
Command{name='1', actions=[Action{name='1'}, Action{name='2'}, Action{name='3'}]}
Command{name='2', actions=[Action{name='1'}, Action{name='2'}, Action{name='3'}, Action{name='4'}, Action{name='5'}, Action{name='6'}]}
Command{name='3', actions=[Action{name='1'}, Action{name='2'}, Action{name='3'}, Action{name='4'}, Action{name='5'}, Action{name='6'}, Action{name='7'}]}
Command{name='4', actions=[Action{name='1'}, Action{name='2'}, Action{name='3'}, Action{name='4'}, Action{name='5'}, Action{name='6'}]}
Command{name='5', actions=[Action{name='1'}, Action{name='2'}, Action{name='3'}, Action{name='4'}, Action{name='5'}, Action{name='6'}]}
2020-12-30 21:17:27.11: Command-2:Action-1 executing on Thread 'ForkJoinPool.commonPool-worker-5' executing for 207ms
2020-12-30 21:17:27.11: Command-4:Action-1 executing on Thread 'ForkJoinPool.commonPool-worker-9' executing for 930ms
2020-12-30 21:17:27.11: Command-1:Action-1 executing on Thread 'ForkJoinPool.commonPool-worker-3' executing for 948ms
2020-12-30 21:17:27.11: Command-3:Action-1 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 173ms
2020-12-30 21:17:27.11: Command-5:Action-1 executing on Thread 'ForkJoinPool.commonPool-worker-11' executing for 348ms
2020-12-30 21:17:27.314: Command-3:Action-2 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 890ms
2020-12-30 21:17:27.345: Command-2:Action-2 executing on Thread 'ForkJoinPool.commonPool-worker-5' executing for 178ms
2020-12-30 21:17:27.485: Command-5:Action-2 executing on Thread 'ForkJoinPool.commonPool-worker-11' executing for 702ms
2020-12-30 21:17:27.485: Command-5:Action-3 executing on Thread 'ForkJoinPool.commonPool-worker-15' executing for 161ms
2020-12-30 21:17:27.532: Command-2:Action-3 executing on Thread 'ForkJoinPool.commonPool-worker-5' executing for 201ms
2020-12-30 21:17:27.657: Command-5:Action-4 executing on Thread 'ForkJoinPool.commonPool-worker-15' executing for 257ms
2020-12-30 21:17:27.735: Command-2:Action-4 executing on Thread 'ForkJoinPool.commonPool-worker-5' executing for 518ms
2020-12-30 21:17:27.919: Command-5:Action-5 executing on Thread 'ForkJoinPool.commonPool-worker-15' executing for 258ms
2020-12-30 21:17:28.06: Command-4:Action-2 executing on Thread 'ForkJoinPool.commonPool-worker-9' executing for 926ms
2020-12-30 21:17:28.075: Command-1:Action-2 executing on Thread 'ForkJoinPool.commonPool-worker-3' executing for 413ms
2020-12-30 21:17:28.184: Command-5:Action-6 executing on Thread 'ForkJoinPool.commonPool-worker-15' executing for 77ms
2020-12-30 21:17:28.216: Command-3:Action-3 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 487ms
2020-12-30 21:17:28.263: Command-2:Action-5 executing on Thread 'ForkJoinPool.commonPool-worker-5' executing for 570ms
2020-12-30 21:17:28.497: Command-1:Action-3 executing on Thread 'ForkJoinPool.commonPool-worker-3' executing for 273ms
2020-12-30 21:17:28.716: Command-3:Action-4 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 302ms
2020-12-30 21:17:28.833: Command-2:Action-6 executing on Thread 'ForkJoinPool.commonPool-worker-5' executing for 202ms
2020-12-30 21:17:28.992: Command-4:Action-3 executing on Thread 'ForkJoinPool.commonPool-worker-9' executing for 733ms
2020-12-30 21:17:29.024: Command-3:Action-5 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 756ms
2020-12-30 21:17:29.727: Command-4:Action-4 executing on Thread 'ForkJoinPool.commonPool-worker-9' executing for 131ms
2020-12-30 21:17:29.78: Command-3:Action-6 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 920ms
2020-12-30 21:17:29.858: Command-4:Action-5 executing on Thread 'ForkJoinPool.commonPool-worker-9' executing for 305ms
2020-12-30 21:17:30.168: Command-4:Action-6 executing on Thread 'ForkJoinPool.commonPool-worker-9' executing for 612ms
2020-12-30 21:17:30.715: Command-3:Action-7 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 330ms
执行者实施
package executors;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.stream.Collectors;
public class CommandActionExample {
public static void main(String[] args) {
// Initialize some starting params
Random random = new Random();
int maxActions = 20;
int maxCommands = 5;
// Generate some commands, with a random number of actions.
// We'll use the indexes as the command and action names to keep it simple/readable
List<Command> commands = new ArrayList<>();
for(Integer c = 0; c < maxCommands; c++) {
Command command = new Command(String.format("%d", c+1));
for(Integer a = 0; a < random.nextInt(maxActions); a++) {
Action action = new Action(random, String.format("%d", a+1));
command.addAction(action);
}
commands.add(command);
}
// Print out the commands we'll execute, again to keep the results readable/understandable
System.out.println("Commands to execute: \n" + commands.stream().map(Command::toString).collect(Collectors.joining("\n")) + "\n");
ExecutorService executorService = Executors.newFixedThreadPool(20);
for(Command command:commands) executorService.submit(command);
// When we execute the results, the actions should be executed in-order within a command at some point in the future
// (not started all at once), so something like:
// 0 Command-2:Action-1 scheduled at 34
// 0 Command-1:Action-1 scheduled at 21
// 0 Command-3:Action-1 scheduled at 4
// 4 Command-3:Action2 scheduled at ...
// 21 Command-1:Action-2 scheduled at ...
// 34 Command-1-Action-2 scheduled at ...
// ...
// Now how to test this...Maybe with JUnit inOrder.verify(...).run() ?
}
public static class Action implements Runnable {
private Command command;
private final Random random;
private final String name;
public Action(Random random, String name) {
this.random = random;
this.name = name;
}
public void setCommand(Command command) {
this.command = command;
}
@Override
public void run() {
// Simply sleep for a random period of time. This simulates pieces of work being done (network request, etc.)
long msTime = random.nextInt(1000);
System.out.println(new Timestamp(System.currentTimeMillis()) + ": Command-" + command.name + ":Action-" + name + " executing on Thread '" + Thread.currentThread().getName() + "' executing for " + msTime + "ms");
try {
Thread.sleep(msTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return "Action{" +
"name='" + name + '\'' +
'}';
}
}
public static class Command implements Runnable {
private final String name;
private final List<Action> actions = new ArrayList<>();
public Command(String name) {
this.name = name;
}
public void addAction(Action action) {
action.setCommand(this);
actions.add(action);
}
@Override
public void run() {
// If there are no actions, then do nothing
if(actions.isEmpty()) return;
ExecutorService executor = Executors.newSingleThreadExecutor(); // Because there is only one thread, this has the effect of executing sequentially and blocking
for(Action action:actions) executor.submit(action);
}
@Override
public String toString() {
return "Command{" +
"name='" + name + '\'' +
", actions=" + actions +
'}';
}
}
}
结果
产出和进度符合预期
2020-12-31 09:43:09.952: Command-3:Action-1 executing on Thread 'pool-4-thread-1' executing for 632ms
2020-12-31 09:43:09.952: Command-4:Action-1 executing on Thread 'pool-3-thread-1' executing for 932ms
2020-12-31 09:43:09.952: Command-2:Action-1 executing on Thread 'pool-6-thread-1' executing for 586ms
2020-12-31 09:43:09.952: Command-5:Action-1 executing on Thread 'pool-5-thread-1' executing for 987ms
2020-12-31 09:43:09.952: Command-1:Action-1 executing on Thread 'pool-2-thread-1' executing for 706ms
2020-12-31 09:43:10.562: Command-2:Action-2 executing on Thread 'pool-6-thread-1' executing for 329ms
2020-12-31 09:43:10.608: Command-3:Action-2 executing on Thread 'pool-4-thread-1' executing for 503ms
2020-12-31 09:43:10.891: Command-2:Action-3 executing on Thread 'pool-6-thread-1' executing for 443ms
2020-12-31 09:43:10.9: Command-4:Action-2 executing on Thread 'pool-3-thread-1' executing for 866ms
2020-12-31 09:43:10.955: Command-5:Action-2 executing on Thread 'pool-5-thread-1' executing for 824ms
2020-12-31 09:43:11.346: Command-2:Action-4 executing on Thread 'pool-6-thread-1' executing for 502ms
2020-12-31 09:43:11.766: Command-4:Action-3 executing on Thread 'pool-3-thread-1' executing for 638ms
2020-12-31 09:43:11.779: Command-5:Action-3 executing on Thread 'pool-5-thread-1' executing for 928ms
2020-12-31 09:43:11.848: Command-2:Action-5 executing on Thread 'pool-6-thread-1' executing for 179ms
2020-12-31 09:43:12.037: Command-2:Action-6 executing on Thread 'pool-6-thread-1' executing for 964ms
2020-12-31 09:43:12.412: Command-4:Action-4 executing on Thread 'pool-3-thread-1' executing for 370ms
2020-12-31 09:43:12.709: Command-5:Action-4 executing on Thread 'pool-5-thread-1' executing for 204ms
2020-12-31 09:43:12.783: Command-4:Action-5 executing on Thread 'pool-3-thread-1' executing for 769ms
2020-12-31 09:43:12.913: Command-5:Action-5 executing on Thread 'pool-5-thread-1' executing for 188ms
2020-12-31 09:43:13.102: Command-5:Action-6 executing on Thread 'pool-5-thread-1' executing for 524ms
2020-12-31 09:43:13.555: Command-4:Action-6 executing on Thread 'pool-3-thread-1' executing for 673ms
2020-12-31 09:43:13.634: Command-5:Action-7 executing on Thread 'pool-5-thread-1' executing for 890ms
2020-12-31 09:43:14.23: Command-4:Action-7 executing on Thread 'pool-3-thread-1' executing for 147ms
2020-12-31 09:43:14.527: Command-5:Action-8 executing on Thread 'pool-5-thread-1' executing for 538ms
我考虑这个问题已经有一段时间了,而且我正在学习越来越多关于线程、执行器等的知识。对executors和threads有了一个大概的了解,但是感觉有点卡
这是我正在尝试做的事情。
有命令,也有动作。 一个命令被命名并且可以被用户任意调用,例如!playsong、!cheer 等。 Action 是将工作发送给服务的东西;例如要求websocket客户端发送一条新消息,或者要求IRC客户端发送一条新消息等
执行命令时,它会按顺序依次执行其操作。
例如,!cheer 命令可能有四个操作:
- 发出一个websocket请求,并等待成功响应(例如:在OBS中显示一个场景项)
- 发送 IRC 消息(例如:发送聊天消息)。一旦发送,
- 等待 1-3 秒(例如:等待视频播放完毕)。等待完成后,
- 发出另一个 websocket 请求(例如:隐藏步骤 1 中的场景项)
这些不仅必须按顺序执行,而且我们也不能让它们全部同时开始(首先完成操作 1、2 和 4,然后最后完成操作 3);每个 Action 取决于先完成其前身。
最重要的是,客户端可以随时任意提交命令,并且不能相互阻塞。例如,!longcommand 可以启动但不会阻止 !shortcommand 启动(假设底层服务未被阻止)。
这是我想做的事情:
我知道我可以使用 Future/Callable 来阻止在给定线程上等待执行结果,因此每个 Action 都应该 return 一个 Future 当 运行 (Future 来自它的它使用的相应服务)。然后,我可以像这样在命令上以阻塞方式简单地逐一调用操作,以确保它们按顺序执行并且每个操作都等待另一个操作完成:
class ExecutableCommand implments Runnable {
// omitted for brevity
run() {
for(Action action:command.getActions()) {
action.run().get();
}
}
但是我将如何处理执行命令? 我想我会通过一个执行器提交每个命令,也许是像这样的 ThreadPoolExecutor,因为每个命令都被提交了?
class ExecutorServiceWrapper {
private final ExecutorService executorService = Executors.newThreadPoolExecutor(4);
void submit(ExecutableCommand command) {
executorService.submit(command)
}
}
然后每个客户端 ofc 将简单地保留对 ExecutorServiceWrapper 的引用并调用它以响应触发它们的事件:
class FromChatHandler() {
private final ExecutorServiceWrapper masterQueue;
onMessage(String message) {
Command command = // parse what command to lookup from message
masterQueue.submit(command)
}
}
@RestController // or whatever
class MyController() {
private final ExecutorServiceWrapper masterQueue;
@Post
executeCommandByName(String commandName) {
Command command = // lookup command
masterQueue.submit(command)
}
}
class directHandler() {
private final ExecutorServiceWrapper masterQueue;
handle(Command command) {
Command command = // build the command given the message
masterQueue.submit(command)
}
}
我假设因为每个命令都被提交给执行器,所以每个命令都会进入自己的线程,因此它不会阻塞其他线程。
但我不确定我是否应该像我一样使用 ExecutableCommand 执行上面的操作并执行命令中的每个操作。
另外,我不确定它是否会处理这种情况: 线程池固定为 5 个线程。 已执行 5 个命令。它们很长 运行ning 并使用不同的服务,但底层服务未被阻止并且仍然可以接受工作。 有人试图执行第 6 个命令 -- 他们不应该被阻止,因为底层服务仍然可以接受工作。
有没有更好的方法来做到这一点?我在正确的轨道上吗?
在这上面多花点时间后,我想出了一些使用执行器或期货的可能解决方案。不确定哪个比另一个更好,但因为我知道我可以扩展 ThreadPoolExecutor(比如,添加暂停功能)我可能会倾向于 Executors。
否则,如果有人有意见,随时欢迎!
我暂时将这两个解决方案保留在我的 GH 中 (),但我也会将它们放在下面。 https://github.com/TinaTiel/concurrency-learning
期货实施
package futures;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
public class CommandActionExample {
public static void main(String[] args) {
// Initialize some starting params
Random random = new Random();
int maxActions = 20;
int maxCommands = 5;
// Generate some commands, with a random number of actions.
// We'll use the indexes as the command and action names to keep it simple/readable
List<Command> commands = new ArrayList<>();
for(Integer c = 0; c < maxCommands; c++) {
Command command = new Command(String.format("%d", c+1));
for(Integer a = 0; a < random.nextInt(maxActions); a++) {
Action action = new Action(random, String.format("%d", a+1));
command.addAction(action);
}
commands.add(command);
}
// Print out the commands we'll execute, again to keep the results readable/understandable
System.out.println("Commands to execute: \n" + commands.stream().map(Command::toString).collect(Collectors.joining("\n")) + "\n");
// Build a Future that tries to execute all commands (supplied as Futures) in an arbitrary order
try {
CompletableFuture.allOf(commands.stream()
.map((Function<Command, CompletableFuture<Void>>) CompletableFuture::runAsync)
.collect(Collectors.toList())
.toArray(CompletableFuture[]::new)
).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
// commands.get(0).run(); // sanity check one of the command's actions run as expected
// When we execute the results, the actions should be executed in-order within a command at some point in the future
// (not started all at once), so something like:
// 0 Command-2:Action-1 scheduled at 34
// 0 Command-1:Action-1 scheduled at 21
// 0 Command-3:Action-1 scheduled at 4
// 4 Command-3:Action2 scheduled at ...
// 21 Command-1:Action-2 scheduled at ...
// 34 Command-1-Action-2 scheduled at ...
// ...
// Now how to test this...Maybe with JUnit inOrder.verify(...).run() ?
}
public static class Action implements Runnable {
private Command command;
private final Random random;
private final String name;
public Action(Random random, String name) {
this.random = random;
this.name = name;
}
public void setCommand(Command command) {
this.command = command;
}
@Override
public void run() {
// Simply sleep for a random period of time. This simulates pieces of work being done (network request, etc.)
long msTime = random.nextInt(1000);
System.out.println(new Timestamp(System.currentTimeMillis()) + ": Command-" + command.name + ":Action-" + name + " executing on Thread '" + Thread.currentThread().getName() + "' executing for " + msTime + "ms");
try {
Thread.sleep(msTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return "Action{" +
"name='" + name + '\'' +
'}';
}
}
public static class Command implements Runnable {
private final String name;
private final List<Action> actions = new ArrayList<>();
public Command(String name) {
this.name = name;
}
public void addAction(Action action) {
action.setCommand(this);
actions.add(action);
}
@Override
public void run() {
// If there are no actions, then do nothing
if(actions.isEmpty()) return;
// Build up a chain of futures.
// Looks like we have to build them up in reverse order, so start with the first action...
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(actions.remove(0));
// ...And then reverse the list and build the rest of the chain
// (yes we could execute backwards...but it's not common and I/others probably don't like to reason about it)
Collections.reverse(actions);
for(int i=0; i< actions.size(); i++) {
completableFuture.thenRun(actions.get(i));
}
// Execute our chain
try {
completableFuture.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return "Command{" +
"name='" + name + '\'' +
", actions=" + actions +
'}';
}
}
}
结果
输出和进度符合预期,但 Futures 似乎使用了 ForkJoinPool。
Commands to execute:
Command{name='1', actions=[Action{name='1'}, Action{name='2'}, Action{name='3'}]}
Command{name='2', actions=[Action{name='1'}, Action{name='2'}, Action{name='3'}, Action{name='4'}, Action{name='5'}, Action{name='6'}]}
Command{name='3', actions=[Action{name='1'}, Action{name='2'}, Action{name='3'}, Action{name='4'}, Action{name='5'}, Action{name='6'}, Action{name='7'}]}
Command{name='4', actions=[Action{name='1'}, Action{name='2'}, Action{name='3'}, Action{name='4'}, Action{name='5'}, Action{name='6'}]}
Command{name='5', actions=[Action{name='1'}, Action{name='2'}, Action{name='3'}, Action{name='4'}, Action{name='5'}, Action{name='6'}]}
2020-12-30 21:17:27.11: Command-2:Action-1 executing on Thread 'ForkJoinPool.commonPool-worker-5' executing for 207ms
2020-12-30 21:17:27.11: Command-4:Action-1 executing on Thread 'ForkJoinPool.commonPool-worker-9' executing for 930ms
2020-12-30 21:17:27.11: Command-1:Action-1 executing on Thread 'ForkJoinPool.commonPool-worker-3' executing for 948ms
2020-12-30 21:17:27.11: Command-3:Action-1 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 173ms
2020-12-30 21:17:27.11: Command-5:Action-1 executing on Thread 'ForkJoinPool.commonPool-worker-11' executing for 348ms
2020-12-30 21:17:27.314: Command-3:Action-2 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 890ms
2020-12-30 21:17:27.345: Command-2:Action-2 executing on Thread 'ForkJoinPool.commonPool-worker-5' executing for 178ms
2020-12-30 21:17:27.485: Command-5:Action-2 executing on Thread 'ForkJoinPool.commonPool-worker-11' executing for 702ms
2020-12-30 21:17:27.485: Command-5:Action-3 executing on Thread 'ForkJoinPool.commonPool-worker-15' executing for 161ms
2020-12-30 21:17:27.532: Command-2:Action-3 executing on Thread 'ForkJoinPool.commonPool-worker-5' executing for 201ms
2020-12-30 21:17:27.657: Command-5:Action-4 executing on Thread 'ForkJoinPool.commonPool-worker-15' executing for 257ms
2020-12-30 21:17:27.735: Command-2:Action-4 executing on Thread 'ForkJoinPool.commonPool-worker-5' executing for 518ms
2020-12-30 21:17:27.919: Command-5:Action-5 executing on Thread 'ForkJoinPool.commonPool-worker-15' executing for 258ms
2020-12-30 21:17:28.06: Command-4:Action-2 executing on Thread 'ForkJoinPool.commonPool-worker-9' executing for 926ms
2020-12-30 21:17:28.075: Command-1:Action-2 executing on Thread 'ForkJoinPool.commonPool-worker-3' executing for 413ms
2020-12-30 21:17:28.184: Command-5:Action-6 executing on Thread 'ForkJoinPool.commonPool-worker-15' executing for 77ms
2020-12-30 21:17:28.216: Command-3:Action-3 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 487ms
2020-12-30 21:17:28.263: Command-2:Action-5 executing on Thread 'ForkJoinPool.commonPool-worker-5' executing for 570ms
2020-12-30 21:17:28.497: Command-1:Action-3 executing on Thread 'ForkJoinPool.commonPool-worker-3' executing for 273ms
2020-12-30 21:17:28.716: Command-3:Action-4 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 302ms
2020-12-30 21:17:28.833: Command-2:Action-6 executing on Thread 'ForkJoinPool.commonPool-worker-5' executing for 202ms
2020-12-30 21:17:28.992: Command-4:Action-3 executing on Thread 'ForkJoinPool.commonPool-worker-9' executing for 733ms
2020-12-30 21:17:29.024: Command-3:Action-5 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 756ms
2020-12-30 21:17:29.727: Command-4:Action-4 executing on Thread 'ForkJoinPool.commonPool-worker-9' executing for 131ms
2020-12-30 21:17:29.78: Command-3:Action-6 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 920ms
2020-12-30 21:17:29.858: Command-4:Action-5 executing on Thread 'ForkJoinPool.commonPool-worker-9' executing for 305ms
2020-12-30 21:17:30.168: Command-4:Action-6 executing on Thread 'ForkJoinPool.commonPool-worker-9' executing for 612ms
2020-12-30 21:17:30.715: Command-3:Action-7 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 330ms
执行者实施
package executors;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.stream.Collectors;
public class CommandActionExample {
public static void main(String[] args) {
// Initialize some starting params
Random random = new Random();
int maxActions = 20;
int maxCommands = 5;
// Generate some commands, with a random number of actions.
// We'll use the indexes as the command and action names to keep it simple/readable
List<Command> commands = new ArrayList<>();
for(Integer c = 0; c < maxCommands; c++) {
Command command = new Command(String.format("%d", c+1));
for(Integer a = 0; a < random.nextInt(maxActions); a++) {
Action action = new Action(random, String.format("%d", a+1));
command.addAction(action);
}
commands.add(command);
}
// Print out the commands we'll execute, again to keep the results readable/understandable
System.out.println("Commands to execute: \n" + commands.stream().map(Command::toString).collect(Collectors.joining("\n")) + "\n");
ExecutorService executorService = Executors.newFixedThreadPool(20);
for(Command command:commands) executorService.submit(command);
// When we execute the results, the actions should be executed in-order within a command at some point in the future
// (not started all at once), so something like:
// 0 Command-2:Action-1 scheduled at 34
// 0 Command-1:Action-1 scheduled at 21
// 0 Command-3:Action-1 scheduled at 4
// 4 Command-3:Action2 scheduled at ...
// 21 Command-1:Action-2 scheduled at ...
// 34 Command-1-Action-2 scheduled at ...
// ...
// Now how to test this...Maybe with JUnit inOrder.verify(...).run() ?
}
public static class Action implements Runnable {
private Command command;
private final Random random;
private final String name;
public Action(Random random, String name) {
this.random = random;
this.name = name;
}
public void setCommand(Command command) {
this.command = command;
}
@Override
public void run() {
// Simply sleep for a random period of time. This simulates pieces of work being done (network request, etc.)
long msTime = random.nextInt(1000);
System.out.println(new Timestamp(System.currentTimeMillis()) + ": Command-" + command.name + ":Action-" + name + " executing on Thread '" + Thread.currentThread().getName() + "' executing for " + msTime + "ms");
try {
Thread.sleep(msTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return "Action{" +
"name='" + name + '\'' +
'}';
}
}
public static class Command implements Runnable {
private final String name;
private final List<Action> actions = new ArrayList<>();
public Command(String name) {
this.name = name;
}
public void addAction(Action action) {
action.setCommand(this);
actions.add(action);
}
@Override
public void run() {
// If there are no actions, then do nothing
if(actions.isEmpty()) return;
ExecutorService executor = Executors.newSingleThreadExecutor(); // Because there is only one thread, this has the effect of executing sequentially and blocking
for(Action action:actions) executor.submit(action);
}
@Override
public String toString() {
return "Command{" +
"name='" + name + '\'' +
", actions=" + actions +
'}';
}
}
}
结果
产出和进度符合预期
2020-12-31 09:43:09.952: Command-3:Action-1 executing on Thread 'pool-4-thread-1' executing for 632ms
2020-12-31 09:43:09.952: Command-4:Action-1 executing on Thread 'pool-3-thread-1' executing for 932ms
2020-12-31 09:43:09.952: Command-2:Action-1 executing on Thread 'pool-6-thread-1' executing for 586ms
2020-12-31 09:43:09.952: Command-5:Action-1 executing on Thread 'pool-5-thread-1' executing for 987ms
2020-12-31 09:43:09.952: Command-1:Action-1 executing on Thread 'pool-2-thread-1' executing for 706ms
2020-12-31 09:43:10.562: Command-2:Action-2 executing on Thread 'pool-6-thread-1' executing for 329ms
2020-12-31 09:43:10.608: Command-3:Action-2 executing on Thread 'pool-4-thread-1' executing for 503ms
2020-12-31 09:43:10.891: Command-2:Action-3 executing on Thread 'pool-6-thread-1' executing for 443ms
2020-12-31 09:43:10.9: Command-4:Action-2 executing on Thread 'pool-3-thread-1' executing for 866ms
2020-12-31 09:43:10.955: Command-5:Action-2 executing on Thread 'pool-5-thread-1' executing for 824ms
2020-12-31 09:43:11.346: Command-2:Action-4 executing on Thread 'pool-6-thread-1' executing for 502ms
2020-12-31 09:43:11.766: Command-4:Action-3 executing on Thread 'pool-3-thread-1' executing for 638ms
2020-12-31 09:43:11.779: Command-5:Action-3 executing on Thread 'pool-5-thread-1' executing for 928ms
2020-12-31 09:43:11.848: Command-2:Action-5 executing on Thread 'pool-6-thread-1' executing for 179ms
2020-12-31 09:43:12.037: Command-2:Action-6 executing on Thread 'pool-6-thread-1' executing for 964ms
2020-12-31 09:43:12.412: Command-4:Action-4 executing on Thread 'pool-3-thread-1' executing for 370ms
2020-12-31 09:43:12.709: Command-5:Action-4 executing on Thread 'pool-5-thread-1' executing for 204ms
2020-12-31 09:43:12.783: Command-4:Action-5 executing on Thread 'pool-3-thread-1' executing for 769ms
2020-12-31 09:43:12.913: Command-5:Action-5 executing on Thread 'pool-5-thread-1' executing for 188ms
2020-12-31 09:43:13.102: Command-5:Action-6 executing on Thread 'pool-5-thread-1' executing for 524ms
2020-12-31 09:43:13.555: Command-4:Action-6 executing on Thread 'pool-3-thread-1' executing for 673ms
2020-12-31 09:43:13.634: Command-5:Action-7 executing on Thread 'pool-5-thread-1' executing for 890ms
2020-12-31 09:43:14.23: Command-4:Action-7 executing on Thread 'pool-3-thread-1' executing for 147ms
2020-12-31 09:43:14.527: Command-5:Action-8 executing on Thread 'pool-5-thread-1' executing for 538ms