在没有线程暂停的情况下执行 ExecutorService 中的任务

Execution of Tasks in ExecutorService without Thread pauses

我有一个有 8 个线程的线程池

private static final ExecutorService SERVICE = Executors.newFixedThreadPool(8);

我的机制模拟 100 个用户(100 个任务)的工作:

List<Callable<Boolean>> callableTasks = new ArrayList<>();
for (int i = 0; i < 100; i++) { // Number of users == 100
    callableTasks.add(new Task(client));
}
SERVICE.invokeAll(callableTasks);
SERVICE.shutdown();

用户执行生成文档的任务。

  1. 获取任务的UUID;
  2. 每 10 秒获取一次任务状态;
  3. 如果任务准备就绪,获取文档。
public class Task implements Callable<Boolean> {

    private final ReportClient client;

    public Task(ReportClient client) {
        this.client = client;
    }

    @Override
    public Boolean call() {
        final var uuid = client.createDocument(documentId);
        GetStatusResponse status = null;
        do {
            try {
                Thread.sleep(10000); // This stop current thread, but not a Task!!!!
            } catch (InterruptedException e) {
                return Boolean.FALSE;
            }
            status = client.getStatus(uuid);
        } while (Status.PENDING.equals(status.status()));
        final var document = client.getReport(uuid);
        return Boolean.TRUE;
    }
}

我想把空闲时间(10 秒)给另一个任务。但是当命令Thread.sleep(10000);被调用时,当前线程暂停执行。前 8 个任务暂停,92 个任务等待 10 秒。如何同时执行 100 个正在进行的任务?

编辑:我刚刚发布了这个答案并意识到您似乎正在使用该代码来模拟真实用户与某些系统的交互。我强烈建议为此使用负载测试实用程序,而不是尝试自己设计。然而,在那种情况下,仅使用 CachedThreadPool 可能会成功,尽管可能不是一个非常健壮或可扩展的解决方案。

Thread.sleep() 行为按预期工作:它挂起线程让 CPU 执行其他线程。 请注意,在这种状态下,线程可能会因与您的代码无关的多种原因而中断,在这种情况下,您的任务 returns false: 我假设您实际上有一些重试逻辑行。

所以你想要两个相互排斥的东西:一方面,如果文档没有准备好,线程应该可以自由地做其他事情,但应该以某种方式 return 并再次检查该文档的状态10 秒后。

这意味着你必须选择:

  • 您肯定需要 once-every-10 秒检查每个文档 - 在这种情况下,可以使用缓存线程池并让它根据需要生成尽可能多的线程,请记住你将承担大量线程的开销,这些线程几乎什么都不做。

  • 或者,您可以先启动异步文档创建过程,然后仅检查可调用文件中的状态,并根据需要重试。

类似于:

public class Task implements Callable<Boolean> {
    private final ReportClient client;
    private final UUID uuid;
    // all args constructor omitted for brevity
    @Override
    public Boolean call() {
        GetStatusResponse status = client.getStatus(uuid);
        if (Status.PENDING.equals(status.status())) {
            final var document = client.getReport(uuid);
            return Boolean.TRUE;
        } else {
            return Boolean.FALSE; //retry next time
        }
    }
}

List<Callable<Boolean>> callableTasks = new ArrayList<>();
for (int i = 0; i < 100; i++) { 
    var uuid = client.createDocument(documentId); //not sure where documentId comes from here in your code
    callableTasks.add(new Task(client, uuid));
}

List<Future<Boolean>> results = SERVICE.invokeAll(callableTasks); 
// retry logic until all results come back as `true` here

这假设 createDocument 相对高效,但该阶段也可以并行化,您只需要使用单独的 Runnable 任务列表并使用执行程序服务调用它们。 请注意,我们还假设文档的状态最终确实会更改为 PENDING 以外的其他状态,但情况很可能并非如此。您可能希望重试超时。

关于今天 Java, 看起来是正确的。您既想吃蛋糕也想吃蛋糕,因为您希望线程在重复任务之前休眠,但您还希望该线程执行其他工作。这在今天是不可能的,但将来可能会。

Loom 计划

在当前 Java 中,Java 线程直接映射到主机 OS 线程。在所有常见的 OSes 中,例如 macOS、BSD、Linux、Windows 等,当代码在主机线程中执行时阻塞(停止等待睡眠,或存储 I/O,或网络 I/O,等等)线程也会阻塞。被阻塞的线程挂起,主机 OS 通常在那个未使用的核心上运行另一个线程。但关键点是挂起的线程不会执行进一步的工作,直到您阻塞调用 sleep returns.

这张图片在 not-so-distant 将来可能会改变。 Project Loom seeks to add virtual threads 到 Java 中的并发设施。

在这项新技术中,许多 Java 虚拟线程被映射到每个主机 OS 线程。兼顾许多 Java 个虚拟线程是由 JVM 而不是 OS 管理的。当 JVM 检测到虚拟线程的执行代码被阻塞时,该虚拟线程被“停放”,由 JVM 搁置,另一个虚拟线程换出以在该“真实”主机 OS 线程上执行。当另一个线程 returns 从它的阻塞调用中退出时,它可以被重新分配给一个“真正的”主机 OS 线程以进一步执行。在 Project Loom 下,主机 OS 线程保持忙碌,在任何挂起的虚拟线程有工作要做时从不空闲。

虚拟线程之间的这种交换非常高效,因此在传统计算机硬件上一次可以 运行 成千上万个线程。

使用虚拟线程,您的代码确实会像您希望的那样工作:Java 中的阻塞调用不会阻塞主机 OS 线程。但虚拟线程是实验性的,仍在开发中,计划作为 preview feature in Java 19. Early-access builds of Java 19 with Loom technology included are available now 供您尝试。但是对于今天的生产部署,您需要遵循 Yevgeniy 的回答中的建议。


请对我的报道持保留态度,因为我不是并发方面的专家。您可以从 Project Loom 团队成员(包括 Ron Pressler 和 Alan Bateman)的文章、访谈和演示中听到真正的专家。

在您的情况下,您似乎需要每隔 x 秒检查一次是否满足某个条件。事实上,从您的代码来看,文档生成似乎是异步的,之后 Task 一直在做的只是等待文档生成发生。

您可以从 Thread-Main 启动每个文档生成并使用 ScheduledThreadPoolExecutor 每隔 x 秒验证一次文档生成是否已完成。此时,您检索结果并取消相应任务的调度。

基本上,一个 ConcurrentHashMap 由 thread-main 和您安排的任务 (mapRes) 共享,而另一个 mapTask 仅在 thread-main 中本地使用跟踪每个 Task.

返回的 ScheduledFuture
public class Main {
    public static void main(String[] args) {
        ScheduledThreadPoolExecutor pool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(8);

        //ConcurrentHashMap shared among the submitted tasks where each Task updates its corresponding outcome to true as soon as the document has been produced
        ConcurrentHashMap<Integer, Boolean> mapRes = new ConcurrentHashMap<>();
        for (int i = 0; i < 100; i++) {
            mapRes.put(i, false);
        }

        String uuid;
        ScheduledFuture<?> schedFut;

        //HashMap containing the ScheduledFuture returned by scheduling each Task to cancel their repetition as soon as the document has been produced
        Map<String, ScheduledFuture<?>> mapTask = new HashMap<>();
        for (int i = 0; i < 100; i++) {
            //Starting the document generation from the thread-main
            uuid = client.createDocument(documentId);

            //Scheduling each Task 10 seconds apart from one another and with an initial delay of i*10 to not start all of them at the same time
            schedFut = pool.scheduleWithFixedDelay(new Task(client, uuid, mapRes), i * 10, 10000, TimeUnit.MILLISECONDS);

            //Adding the ScheduledFuture to the map
            mapTask.put(uuid, schedFut);
        }

        //Keep checking the outcome of each task until all of them have been canceled due to completion
        while (!mapTasks.values().stream().allMatch(v -> v.isCancelled())) {
            for (Integer key : mapTasks.keySet()) {
                //Canceling the i-th task scheduling if:
                //  - Its result is positive (i.e. its verification is terminated)
                //  - The task hasn't been canceled already
                if (mapRes.get(key) && !mapTasks.get(key).isCancelled()) {
                    schedFut = mapTasks.get(key);
                    schedFut.cancel(true);
                }
            }

            //... eventually adding a sleep to check the completion every x seconds ...
        }

        pool.shutdown();
    }
}

class Task implements Runnable {

    private final ReportClient client;
    private final String uuid;

    private final ConcurrentHashMap mapRes;

    public Task(ReportClient client, String uuid, ConcurrentHashMap mapRes) {
        this.client = client;
        this.uuid = uuid;
        this.mapRes = mapRes;
    }

    @Override
    public void run() {
        //This is taken form your code and I'm assuming that if it's not pending then it's completed
        if (!Status.PENDING.equals(client.getStatus(uuid).status())) {
            mapRes.replace(uuid, true);
        }
    }
}

我已经在本地测试了你的情况,模拟了 n Tasks 等待创建具有相同 ID(或你的情况下为 uuid)的文件夹的场景。我会 post 它就在这里作为示例,以防你想先尝试一些更简单的东西。

public class Main {
    public static void main(String[] args) {
        ScheduledThreadPoolExecutor pool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(2);

        ConcurrentHashMap<Integer, Boolean> mapRes = new ConcurrentHashMap<>();
        for (int i = 0; i < 16; i++) {
            mapRes.put(i, false);
        }

        ScheduledFuture<?> schedFut;
        Map<Integer, ScheduledFuture<?>> mapTasks = new HashMap<>();
        for (int i = 0; i < 16; i++) {
            schedFut = pool.scheduleWithFixedDelay(new MyTask(i, mapRes), i * 20, 3000, TimeUnit.MILLISECONDS);
            mapTasks.put(i, schedFut);
        }

        while (!mapTasks.values().stream().allMatch(v -> v.isCancelled())) {
            for (Integer key : mapTasks.keySet()) {
                if (mapRes.get(key) && !mapTasks.get(key).isCancelled()) {
                    schedFut = mapTasks.get(key);
                    schedFut.cancel(true);
                }
            }
        }

        pool.shutdown();
    }
}

class MyTask implements Runnable {
    private int num;
    private ConcurrentHashMap mapRes;

    public MyTask(int num, ConcurrentHashMap mapRes) {
        this.num = num;
        this.mapRes = mapRes;
    }

    @Override
    public void run() {
        System.out.println("Task " + num + " is checking whether the folder exists: " + Files.exists(Path.of("./" + num)));
        if (Files.exists(Path.of("./" + num))) {
            mapRes.replace(num, true);
        }
    }
}