为 java 中的 运行 异步作业实现优雅取消的最佳方式

Best way to implement graceful cancel for running async jobs in java

假设我的应用程序中的组件有一个类似这样的接口 运行 作业 -

IJob {
    IResult execute();
    void cancel();
}

我想设置我的应用程序,所以我 运行 这些作业是异步的。预期调用取消应该立即执行 return,结果表明它已被取消。

最好的设置方法是什么?我可以为 运行 创建一个 Thread 对象,它有额外的取消方法,但我也在查看我不熟悉的 Future 接口。

FutureTask 的问题是取消不正常,不允许我调用 job.cancel()。扩展 FutureTask 并实现我自己的处理方式是个好主意吗?

当你在你的任务中调用cancel时,它会向线程运行任务发送一个中断信号。您的任务将需要定期检查该信号是否已发送,并在发送时做出相应反应:

if (Thread.interrupted()) {
    performNecessaryCleanup();
    return;
}

使用并发时,使用语言提供的东西,而不是手动实现。

据我了解,ExecutorService 应该是适合您的工具,因为您可以:

  • 为其提供将 运行 异步并可能 return 结果的作业
  • 关闭执行器以便取消所有 运行ning 作业

例子

public static void main(String[] args) {
    ExecutorService executor = Executors.newSingleThreadExecutor();
    List<Future<IResult>> results = new ArrayList<>();

    for (int i = 0; i < 10; i++) {
        results.add(executor.submit(new Job(i))); //start jobs
    }

    executor.shutdownNow(); //attempts to stop all running jobs

    //program flow immediatly continues
}

就像@JoeC 在他的回答中解释的那样,保证所有作业停止的条件是 中断内部 每个作业中进行管理因为每个线程在调用 shutdownNow().

时都会被标记为 interrupted
if (Thread.interrupted()) {
    //return result cancelled
}

调用IJob.execute()FutureTask.run()会阻塞当前线程,需要计划 IJobFutureTask

调度 FutureTask 是最好的选择,消费者可以调用 FutureTask.get() 并等待结果(即使如果你调用 IJob.cancel()).

我做了一个小演示模拟 IJobIResult,它使用普通线程进行调度,在生产中你应该有一个 ExecutorService 与前面的 post 示例一样。

如您所见,主线程可以检查调用 FutureTask.isDone() 的状态,基本上您是在检查结果是否已经设置。设置的结果表示IJob的线程结束。

您几乎可以随时调用 IJob.cancel() 来完成 FutureTask[ 中包装的 IJob =55=],如果该方法的行为与您评论中的一样。

模拟工作:

public class MockJob implements IJob {

    private boolean cancelled;

    public MockJob() {
    }

    @Override
    public IResult execute() {
        int count = 0;
        while (!cancelled) {
            try {
                count++;
                System.out.println("Mock Job Thread: count = " + count);
                if (count >= 10) {
                    break;
                }
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                cancelled = true;
            }
        }
        return new MockResult(cancelled, count);
    }

    @Override
    public void cancel() {
        cancelled = true;
    }
}

模拟结果:

public class MockResult implements IResult {

    private boolean cancelled;
    private int result;

    public MockResult(boolean cancelled, int result) {
        this.cancelled = cancelled;
        this.result = result;
    }

    public boolean isCancelled() {
        return cancelled;
    }

    public int getResult() {
        return result;
    }
}

主要Class:

public class Main {

    public static void main(String[] args) throws InterruptedException {
        // Job
        IJob mockJob = new MockJob();

        // Async task
        FutureTask<IResult> asyncTask = new FutureTask<>(mockJob::execute);
        Thread mockJobThread = new Thread(asyncTask);

        // Show result
        Thread showResultThread = new Thread(() -> {
            try {
                IResult result = asyncTask.get();
                MockResult mockResult = (MockResult) result;
                Thread thread = Thread.currentThread();
                System.out.println(String.format("%s: isCancelled = %s, result = %d",
                        thread.getName(),
                        mockResult.isCancelled(),
                        mockResult.getResult()
                ));
            } catch (InterruptedException | ExecutionException ex) {
                // NO-OP
            }
        });

        // Check status
        Thread monitorThread = new Thread(() -> {
            try {
                while (!asyncTask.isDone()) {
                    Thread thread = Thread.currentThread();
                    System.out.println(String.format("%s: asyncTask.isDone = %s",
                            thread.getName(),
                            asyncTask.isDone()
                    ));
                    Thread.sleep(1000);
                }
            } catch (InterruptedException ex) {
                // NO-OP
            }
            Thread thread = Thread.currentThread();
            System.out.println(String.format("%s: asyncTask.isDone = %s",
                    thread.getName(),
                    asyncTask.isDone()
            ));
        });

        // Async cancel
        Thread cancelThread = new Thread(() -> {
            try {
                // Play with this Thread.sleep, set to 15000
                Thread.sleep(5000);
                if (!asyncTask.isDone()) {
                    Thread thread = Thread.currentThread();
                    System.out.println(String.format("%s: job.cancel()",
                            thread.getName()
                    ));
                    mockJob.cancel();
                }
            } catch (InterruptedException ex) {
                // NO-OP
            }
        });

        monitorThread.start();
        showResultThread.start();
        cancelThread.setDaemon(true);
        cancelThread.start();
        mockJobThread.start();
    }
}

输出 (Thread.sleep(5000)):

Thread-2: asyncTask.isDone = false
Thread-0: count = 1
Thread-2: asyncTask.isDone = false
Thread-0: count = 2
Thread-2: asyncTask.isDone = false
Thread-0: count = 3
Thread-2: asyncTask.isDone = false
Thread-0: count = 4
Thread-2: asyncTask.isDone = false
Thread-0: count = 5
Thread-3: job.cancel()
Thread-2: asyncTask.isDone = false
Thread-1: isCancelled = true, result = 5
Thread-2: asyncTask.isDone = true

输出 (Thread.sleep(15000)):

Thread-2: asyncTask.isDone = false
Thread-0: count = 1
Thread-2: asyncTask.isDone = false
Thread-0: count = 2
Thread-2: asyncTask.isDone = false
Thread-0: count = 3
Thread-2: asyncTask.isDone = false
Thread-0: count = 4
Thread-2: asyncTask.isDone = false
Thread-0: count = 5
Thread-2: asyncTask.isDone = false
Thread-0: count = 6
Thread-2: asyncTask.isDone = false
Thread-0: count = 7
Thread-2: asyncTask.isDone = false
Thread-0: count = 8
Thread-2: asyncTask.isDone = false
Thread-0: count = 9
Thread-2: asyncTask.isDone = false
Thread-0: count = 10
Thread-1: isCancelled = false, result = 10
Thread-2: asyncTask.isDone = true