让其他线程等待,直到 ScheduledExecutorService 完成所有任务

Make other threads wait until ScheduledExecutorService completes all tasks

主要目标是 运行 一种使用 ScheduledExecutorService 的方法,并等待其所有任务完成后再恢复主线程。

我在自定义 Scheduler class 中创建了一个实用程序方法,它接受任何 Runnable:

public void scheduleFunction(Runnable function) {
    final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    final ScheduledFuture<?> producerHandle = scheduler.scheduleAtFixedRate(function, initialDelay, interval, MILLISECONDS);
    scheduler.schedule(() -> { producerHandle.cancel(true); }, timeout, MILLISECONDS);
}

当我需要在预定模式下执行它的方法时,在其他 class 中像这样使用它:

public void sendToKafka() {
  Scheduler.scheduleFunction(this::produce);
}

这项工作很好,除了一件事。 当主线程到达 sendToKafka() 时,它调用 Scheduler 来调度一个函数。 dut主线程保持运行ning,同时Scheduled函数开始工作

实际结果: 两个线程 运行 同时

预期结果: 当调度程序线程启动时,主线程停止并等待调度程序完成执行

我怎样才能做到这一点?

由于在该方法中创建和放弃一个ScheduledExecutorService,所以应该调用shutdown()支持及时释放资源。如果这样做,您可以调用 awaitTermination 等待所有挂起的作业完成。

public void scheduleFunction(Runnable function) {
    final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    final ScheduledFuture<?> producerHandle
        = scheduler.scheduleAtFixedRate(function, initialDelay, interval, MILLISECONDS);
    scheduler.schedule(() -> {
        producerHandle.cancel(true);
        scheduler.shutdown();
    }, timeout, MILLISECONDS);
    try {
        scheduler.awaitTermination(Long.MAX_VALUE, MILLISECONDS);
    } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
    }
}

注意,当你不需要打扰时,你可以简单地使用

public void scheduleFunction(Runnable function) {
    final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    scheduler.scheduleAtFixedRate(function, initialDelay, interval, MILLISECONDS);
    scheduler.schedule(() -> scheduler.shutdown(), timeout, MILLISECONDS);
    try {
        scheduler.awaitTermination(Long.MAX_VALUE, MILLISECONDS);
    } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
    }
}

因为关闭 ScheduledExecutorService 意味着停止重新安排作业;仅当有正在进行的执行时才会完成,awaitTermination 将等待它。