如何识别被取消的 ScheduledFuture 是否真的没有被取消?

How to identify if cancelled ScheduledFuture is actually not cancelled?

我正在使用 ScheduledExecutorService 并提交这样的任务:

future = scheduledExecutorService.schedule(myRunnableTask, delay, timeunit)

然而,某个事件可能会在不确定的时间后发生,这表明不再需要此任务。所以我需要取消这个任务,我正在使用

boolean cancelled = future.cancel(false)行。

取消后,我必须根据提交的运行实际是否运行采取不同的行动。让我们首先进入 Oracle 文档并阅读 cancelled 标志的含义:

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Future.html#cancel(boolean)

Returns: false if the task could not be cancelled, typically because it has already completed normally; true otherwise

这就是关于 return 值的全部内容。写这行文字的人似乎不确定这里的 false return 值,但我想我可以接受。

现在让我们关注案例,当它 returns true 时。这里有两种可能:

  1. 任务实际上已被取消并且运行永远无法运行。
  2. 运行nable 正在运行ning 中,因此无法取消。 (除非我做了一些我不想做的线程中断逻辑)

我可以接受这两种情况的发生,但我想知道实际发生了哪一种情况并采取相应的措施。如果 运行nable 正在进行中,那么我可以接受它完成它的工作,我想等待它完成然后做一件事。但如果它被取消并且根本不会 运行,我想做另一件事。

你能推荐一种方法吗?我错过了什么吗?

您可以通过以下方式实现您的目标:

  • 一个 Set<Runnable> 跟踪线程池已开始执行的 Runnable 个。
  • A Map<ScheduledFuture<?>, Runnable>ScheduledFuture<?> 映射到其各自的 Runnable
    • 安排任务后,您应立即将 ScheduledFuture 及其各自的 Runnable 添加到 Map
    • 如果插入到 Map 中的操作是通过调度任务本身自动执行的,那么您可以避免 ScheduledFuture 从未添加到 Map 的边缘情况,即使在之后它被取消了。

我建议更改您的 ScheduledExecutorService to a ScheduledThreadPoolExecutor, which will allow you to override its beforeExecute(Thread, Runnable) 方法;此方法在任务被池 运行 之前立即调用,因为它已经分配了一个将执行任务的线程。

覆盖此方法时,您可以将 Runnable 添加到 Set<Runnable>

然后,当 ScheduledFuture 被取消时,您可以调用 set.contains(map.get(future)),它会告诉您 RunnableScheduledFuture 映射到)是否被执行。


请注意,您的 Set<Runnable>Map<ScheduledFuture<?>, Runnable> 实现可能必须是线程安全的,以避免可能的竞争条件。

我最终为这个问题写了这样的东西。可以在 https://github.com/nuzayats/cancellabletaskexecutor

找到源代码和一些单元测试
public class CancellableTaskExecutor {

    private final ScheduledExecutorService es;
    private final Logger log;

    /**
     * For a unit test to replicate a particular timing
     */
    private final Runnable hookBetweenCancels;

    public CancellableTaskExecutor(ScheduledExecutorService es, Logger log) {
        this(es, log, () -> {
            // nop
        });
    }

    // For unit tests
    CancellableTaskExecutor(ScheduledExecutorService es, Logger log, Runnable hookBetweenCancels) {
        this.es = es;
        this.log = log;
        this.hookBetweenCancels = hookBetweenCancels;
    }

    public Execution schedule(Runnable task, long delay, TimeUnit unit) {
        CancellableRunnable runnable = new CancellableRunnable(task);
        ScheduledFuture<?> future = es.schedule(runnable, delay, unit);
        return new Execution(future, runnable);
    }

    public class Execution {

        private final ScheduledFuture<?> future;
        private final CancellableRunnable runnable;

        private Execution(ScheduledFuture<?> future, CancellableRunnable runnable) {
            this.future = future;
            this.runnable = runnable;
        }

        /**
         * @return true when the task has been successfully cancelled and it's guaranteed that
         * the task won't get executed. otherwise false
         */
        public boolean cancel() {
            boolean cancelled = runnable.cancel();
            hookBetweenCancels.run();

            // the return value of this call is unreliable; see 
            future.cancel(false);

            return cancelled;
        }
    }

    private class CancellableRunnable implements Runnable {

        private final AtomicBoolean cancelledOrStarted = new AtomicBoolean();
        private final Runnable task;

        private CancellableRunnable(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            if (!cancelledOrStarted.compareAndSet(false, true)) {
                return; // cancelled, forget about the task
            }
            try {
                task.run();
            } catch (Throwable e) {
                log.log(Level.WARNING, "Uncaught Exception", e);
            }
        }

        boolean cancel() {
            return cancelledOrStarted.compareAndSet(false, true);
        }
    }
}