为 java 中的 运行 异步作业实现优雅取消的最佳方式
Best way to implement graceful cancel for running async jobs in java
假设我的应用程序中的组件有一个类似这样的接口 运行 作业 -
IJob {
IResult execute();
void cancel();
}
我想设置我的应用程序,所以我 运行 这些作业是异步的。预期调用取消应该立即执行 return,结果表明它已被取消。
最好的设置方法是什么?我可以为 运行 创建一个 Thread 对象,它有额外的取消方法,但我也在查看我不熟悉的 Future 接口。
FutureTask 的问题是取消不正常,不允许我调用 job.cancel()。扩展 FutureTask 并实现我自己的处理方式是个好主意吗?
当你在你的任务中调用cancel
时,它会向线程运行任务发送一个中断信号。您的任务将需要定期检查该信号是否已发送,并在发送时做出相应反应:
if (Thread.interrupted()) {
performNecessaryCleanup();
return;
}
使用并发时,使用语言提供的东西,而不是手动实现。
据我了解,ExecutorService 应该是适合您的工具,因为您可以:
- 为其提供将 运行 异步并可能 return 结果的作业
- 关闭执行器以便取消所有 运行ning 作业
例子
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
List<Future<IResult>> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
results.add(executor.submit(new Job(i))); //start jobs
}
executor.shutdownNow(); //attempts to stop all running jobs
//program flow immediatly continues
}
就像@JoeC 在他的回答中解释的那样,保证所有作业停止的条件是 中断 在内部 每个作业中进行管理因为每个线程在调用 shutdownNow()
.
时都会被标记为 interrupted
if (Thread.interrupted()) {
//return result cancelled
}
调用IJob.execute()或FutureTask.run()会阻塞当前线程,需要计划 IJob 或 FutureTask。
调度 FutureTask 是最好的选择,消费者可以调用 FutureTask.get() 并等待结果(即使如果你调用 IJob.cancel()).
我做了一个小演示模拟 IJob 和 IResult,它使用普通线程进行调度,在生产中你应该有一个 ExecutorService 与前面的 post 示例一样。
如您所见,主线程可以检查调用 FutureTask.isDone() 的状态,基本上您是在检查结果是否已经设置。设置的结果表示IJob的线程结束。
您几乎可以随时调用 IJob.cancel() 来完成 FutureTask[ 中包装的 IJob =55=],如果该方法的行为与您评论中的一样。
模拟工作:
public class MockJob implements IJob {
private boolean cancelled;
public MockJob() {
}
@Override
public IResult execute() {
int count = 0;
while (!cancelled) {
try {
count++;
System.out.println("Mock Job Thread: count = " + count);
if (count >= 10) {
break;
}
Thread.sleep(1000);
} catch (InterruptedException e) {
cancelled = true;
}
}
return new MockResult(cancelled, count);
}
@Override
public void cancel() {
cancelled = true;
}
}
模拟结果:
public class MockResult implements IResult {
private boolean cancelled;
private int result;
public MockResult(boolean cancelled, int result) {
this.cancelled = cancelled;
this.result = result;
}
public boolean isCancelled() {
return cancelled;
}
public int getResult() {
return result;
}
}
主要Class:
public class Main {
public static void main(String[] args) throws InterruptedException {
// Job
IJob mockJob = new MockJob();
// Async task
FutureTask<IResult> asyncTask = new FutureTask<>(mockJob::execute);
Thread mockJobThread = new Thread(asyncTask);
// Show result
Thread showResultThread = new Thread(() -> {
try {
IResult result = asyncTask.get();
MockResult mockResult = (MockResult) result;
Thread thread = Thread.currentThread();
System.out.println(String.format("%s: isCancelled = %s, result = %d",
thread.getName(),
mockResult.isCancelled(),
mockResult.getResult()
));
} catch (InterruptedException | ExecutionException ex) {
// NO-OP
}
});
// Check status
Thread monitorThread = new Thread(() -> {
try {
while (!asyncTask.isDone()) {
Thread thread = Thread.currentThread();
System.out.println(String.format("%s: asyncTask.isDone = %s",
thread.getName(),
asyncTask.isDone()
));
Thread.sleep(1000);
}
} catch (InterruptedException ex) {
// NO-OP
}
Thread thread = Thread.currentThread();
System.out.println(String.format("%s: asyncTask.isDone = %s",
thread.getName(),
asyncTask.isDone()
));
});
// Async cancel
Thread cancelThread = new Thread(() -> {
try {
// Play with this Thread.sleep, set to 15000
Thread.sleep(5000);
if (!asyncTask.isDone()) {
Thread thread = Thread.currentThread();
System.out.println(String.format("%s: job.cancel()",
thread.getName()
));
mockJob.cancel();
}
} catch (InterruptedException ex) {
// NO-OP
}
});
monitorThread.start();
showResultThread.start();
cancelThread.setDaemon(true);
cancelThread.start();
mockJobThread.start();
}
}
输出 (Thread.sleep(5000)):
Thread-2: asyncTask.isDone = false
Thread-0: count = 1
Thread-2: asyncTask.isDone = false
Thread-0: count = 2
Thread-2: asyncTask.isDone = false
Thread-0: count = 3
Thread-2: asyncTask.isDone = false
Thread-0: count = 4
Thread-2: asyncTask.isDone = false
Thread-0: count = 5
Thread-3: job.cancel()
Thread-2: asyncTask.isDone = false
Thread-1: isCancelled = true, result = 5
Thread-2: asyncTask.isDone = true
输出 (Thread.sleep(15000)):
Thread-2: asyncTask.isDone = false
Thread-0: count = 1
Thread-2: asyncTask.isDone = false
Thread-0: count = 2
Thread-2: asyncTask.isDone = false
Thread-0: count = 3
Thread-2: asyncTask.isDone = false
Thread-0: count = 4
Thread-2: asyncTask.isDone = false
Thread-0: count = 5
Thread-2: asyncTask.isDone = false
Thread-0: count = 6
Thread-2: asyncTask.isDone = false
Thread-0: count = 7
Thread-2: asyncTask.isDone = false
Thread-0: count = 8
Thread-2: asyncTask.isDone = false
Thread-0: count = 9
Thread-2: asyncTask.isDone = false
Thread-0: count = 10
Thread-1: isCancelled = false, result = 10
Thread-2: asyncTask.isDone = true
假设我的应用程序中的组件有一个类似这样的接口 运行 作业 -
IJob {
IResult execute();
void cancel();
}
我想设置我的应用程序,所以我 运行 这些作业是异步的。预期调用取消应该立即执行 return,结果表明它已被取消。
最好的设置方法是什么?我可以为 运行 创建一个 Thread 对象,它有额外的取消方法,但我也在查看我不熟悉的 Future 接口。
FutureTask 的问题是取消不正常,不允许我调用 job.cancel()。扩展 FutureTask 并实现我自己的处理方式是个好主意吗?
当你在你的任务中调用cancel
时,它会向线程运行任务发送一个中断信号。您的任务将需要定期检查该信号是否已发送,并在发送时做出相应反应:
if (Thread.interrupted()) {
performNecessaryCleanup();
return;
}
使用并发时,使用语言提供的东西,而不是手动实现。
据我了解,ExecutorService 应该是适合您的工具,因为您可以:
- 为其提供将 运行 异步并可能 return 结果的作业
- 关闭执行器以便取消所有 运行ning 作业
例子
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
List<Future<IResult>> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
results.add(executor.submit(new Job(i))); //start jobs
}
executor.shutdownNow(); //attempts to stop all running jobs
//program flow immediatly continues
}
就像@JoeC 在他的回答中解释的那样,保证所有作业停止的条件是 中断 在内部 每个作业中进行管理因为每个线程在调用 shutdownNow()
.
if (Thread.interrupted()) {
//return result cancelled
}
调用IJob.execute()或FutureTask.run()会阻塞当前线程,需要计划 IJob 或 FutureTask。
调度 FutureTask 是最好的选择,消费者可以调用 FutureTask.get() 并等待结果(即使如果你调用 IJob.cancel()).
我做了一个小演示模拟 IJob 和 IResult,它使用普通线程进行调度,在生产中你应该有一个 ExecutorService 与前面的 post 示例一样。
如您所见,主线程可以检查调用 FutureTask.isDone() 的状态,基本上您是在检查结果是否已经设置。设置的结果表示IJob的线程结束。
您几乎可以随时调用 IJob.cancel() 来完成 FutureTask[ 中包装的 IJob =55=],如果该方法的行为与您评论中的一样。
模拟工作:
public class MockJob implements IJob {
private boolean cancelled;
public MockJob() {
}
@Override
public IResult execute() {
int count = 0;
while (!cancelled) {
try {
count++;
System.out.println("Mock Job Thread: count = " + count);
if (count >= 10) {
break;
}
Thread.sleep(1000);
} catch (InterruptedException e) {
cancelled = true;
}
}
return new MockResult(cancelled, count);
}
@Override
public void cancel() {
cancelled = true;
}
}
模拟结果:
public class MockResult implements IResult {
private boolean cancelled;
private int result;
public MockResult(boolean cancelled, int result) {
this.cancelled = cancelled;
this.result = result;
}
public boolean isCancelled() {
return cancelled;
}
public int getResult() {
return result;
}
}
主要Class:
public class Main {
public static void main(String[] args) throws InterruptedException {
// Job
IJob mockJob = new MockJob();
// Async task
FutureTask<IResult> asyncTask = new FutureTask<>(mockJob::execute);
Thread mockJobThread = new Thread(asyncTask);
// Show result
Thread showResultThread = new Thread(() -> {
try {
IResult result = asyncTask.get();
MockResult mockResult = (MockResult) result;
Thread thread = Thread.currentThread();
System.out.println(String.format("%s: isCancelled = %s, result = %d",
thread.getName(),
mockResult.isCancelled(),
mockResult.getResult()
));
} catch (InterruptedException | ExecutionException ex) {
// NO-OP
}
});
// Check status
Thread monitorThread = new Thread(() -> {
try {
while (!asyncTask.isDone()) {
Thread thread = Thread.currentThread();
System.out.println(String.format("%s: asyncTask.isDone = %s",
thread.getName(),
asyncTask.isDone()
));
Thread.sleep(1000);
}
} catch (InterruptedException ex) {
// NO-OP
}
Thread thread = Thread.currentThread();
System.out.println(String.format("%s: asyncTask.isDone = %s",
thread.getName(),
asyncTask.isDone()
));
});
// Async cancel
Thread cancelThread = new Thread(() -> {
try {
// Play with this Thread.sleep, set to 15000
Thread.sleep(5000);
if (!asyncTask.isDone()) {
Thread thread = Thread.currentThread();
System.out.println(String.format("%s: job.cancel()",
thread.getName()
));
mockJob.cancel();
}
} catch (InterruptedException ex) {
// NO-OP
}
});
monitorThread.start();
showResultThread.start();
cancelThread.setDaemon(true);
cancelThread.start();
mockJobThread.start();
}
}
输出 (Thread.sleep(5000)):
Thread-2: asyncTask.isDone = false
Thread-0: count = 1
Thread-2: asyncTask.isDone = false
Thread-0: count = 2
Thread-2: asyncTask.isDone = false
Thread-0: count = 3
Thread-2: asyncTask.isDone = false
Thread-0: count = 4
Thread-2: asyncTask.isDone = false
Thread-0: count = 5
Thread-3: job.cancel()
Thread-2: asyncTask.isDone = false
Thread-1: isCancelled = true, result = 5
Thread-2: asyncTask.isDone = true
输出 (Thread.sleep(15000)):
Thread-2: asyncTask.isDone = false
Thread-0: count = 1
Thread-2: asyncTask.isDone = false
Thread-0: count = 2
Thread-2: asyncTask.isDone = false
Thread-0: count = 3
Thread-2: asyncTask.isDone = false
Thread-0: count = 4
Thread-2: asyncTask.isDone = false
Thread-0: count = 5
Thread-2: asyncTask.isDone = false
Thread-0: count = 6
Thread-2: asyncTask.isDone = false
Thread-0: count = 7
Thread-2: asyncTask.isDone = false
Thread-0: count = 8
Thread-2: asyncTask.isDone = false
Thread-0: count = 9
Thread-2: asyncTask.isDone = false
Thread-0: count = 10
Thread-1: isCancelled = false, result = 10
Thread-2: asyncTask.isDone = true