如果线程等待其他内容,则使线程可用

Make Thread usable if it waits for other stuff

我有 10 个对象实现了这个方法:

public CompletableFuture<TestObject> processAsync(Executor executor){
    return CompletableFuture.supplyAsync(
        () -> DoLongSynchronousRestCall(), //Takes 3 Seconds
        executor
    );
}

我已经实现了一些代码,它遍历所有 10 个对象并检查 processAsync 方法的 completableFuture 是否为 Done(),然后提取结果。这完全正常,符合预期。

“棘手”的事情来了。我的线程池包含 5 个线程,这意味着我可以同时处理 5 个对象,这导致一切都需要 6 秒。

但是我想实现的是5个Thread都执行DoLongSynchronousRestCall(),然后他们看到自己需要等待,直接找工作。这意味着一切都应该在大约 3.4 秒内完成(如果我们假设所有这些东西的开销是 0.4 秒)。

这样的事情可能吗?

例如将线程执行标记为“请再看 1 秒,同时您可以做更多的工作”?

听起来您想 运行 在后台线程上执行一堆任务并等待它们全部完成,然后收集它们返回的结果。

执行者

Executors 框架已添加到 Java 5 以简化此类工作。

将您的任务定义为 RunnableCallable。在您的情况下,Callable 因为您想要返回一个值。

在我们的任务中,我们通过让当前线程休眠三秒钟来模拟需要很长时间的工作。为了模拟要返回的结果,我们将当前时刻捕获为 Instant 对象。

Callable < Instant > task = () -> {
    Thread.sleep( Duration.ofSeconds( 3 ).toMillis() );  // Simulating work that takes a long while. 
    return Instant.now();  // Simulating result of work to be returned.
};

收集一堆要在后台线程上执行的任务。这里我们多次分配同一个任务对象。您也可以实例化多个任务对象。

List < Callable < Instant > > tasks = new ArrayList <>();  // Collect tasks to be executed. 
int limit = 5;
for ( int i = 0 ; i < limit ; i++ ) { tasks.add( task ); }

实例化 ExecutorService via Executors 实用程序 class。

您有多种执行器服务实现可供选择,具有各种行为。如果您知道您一次将有少量任务 运行,您可以选择一个缓存线程池,它会一次生成一堆线程。对于大量任务,请选择另一个 ExecutorService 以避免使您的机器不堪重负。

ExecutorService executorService = Executors.newCachedThreadPool();

仅供参考……将来,Project Loom may bring virtual threads (fibers) to Java to enable millions of simultaneous threads (if not CPU-bound). Experimental builds are available now, based on early-access Java 18. You would use another executor service

ExecutorService executorService = Executors.newVirtualThreadExecutor();

如果您有大量任务,并且尚未使用 Project Loom 技术,您可能需要选择 executor service backed by a limited number of threads

ExecutorService executorService = Executors.newFixedThreadPool( 3 );

提交我们要执行的任务集合。每个任务提交的结果是一个Future。我们将所有预期结果作为 Future object.s

的集合进行跟踪
List < Future < Instant > > futures = null;
try { futures = executorService.invokeAll( tasks ); } catch ( InterruptedException e ) { e.printStackTrace(); }

开始关闭我们的执行器服务。这会阻止提交更多任务。

executorService.shutdown();

等待所有提交的任务完成。如果任务花费的时间超过预期,则分配超时以抛出异常。

try { executorService.awaitTermination( 1 , TimeUnit.MINUTES ); } catch ( InterruptedException e ) { e.printStackTrace(); }

报告结果。循环每个 Future 对象。查看任务是否已取消。如果没有取消,那么我们知道它一定已经完成,因为我们已经过去等待执行程序服务关闭。

for ( Future < Instant > future : futures )
{
    if ( future.isCancelled() )
    {
        System.out.println( "Canceled." );
    } else
    {
        try { System.out.println( future.get() ); } catch ( InterruptedException e ) { e.printStackTrace(); } catch ( ExecutionException e ) { e.printStackTrace(); }
    }
}

当 运行 时,用一对 System.out.println 调用我们的执行程序服务的执行。您可以看到所有任务都在 21 秒后提交,它们都休眠了 3 秒,然后在 24 秒唤醒并执行。

此 运行 在 MacBook Pro(13 英寸,Apple Silicon M1,2020 年)上使用 8(4 个性能和 4 个效率)核心,使用早期访问 Java17。 =39=]

INFO - Starting execution at 2021-08-28T21:33:21.514429Z
INFO - Ending execution at 2021-08-28T21:33:24.525798Z
2021-08-28T21:33:24.524563Z
2021-08-28T21:33:24.522178Z
2021-08-28T21:33:24.522177Z
2021-08-28T21:33:24.522178Z
2021-08-28T21:33:24.522181Z

线程调度

是正确的。 Java 中的阻塞线程 不会 阻塞 CPU 核心的使用。

Java 的当前实现(至少通过 Java 17)使用主机 OS 线程作为 Java 线程。这意味着在哪个 CPU 内核上调度哪个线程 运行 以及持续多长时间,由主机 OS.

控制

目前,如果您的 Java 代码阻塞,Java 线程阻塞,因此主机 OS 线程阻塞。线程是否被阻塞,是否继续在 CPU 核心上执行取决于主机 OS。

请注意,在主机 OS 线程之间切换以在 CPU 核心上执行相对“昂贵”,开销为 CPU 周期,并且可能分配了过多的内存。这就是为什么你不应该用太多的线程让你的机器负担过重。线程数与核心数大致相同是一般准则,但它会根据正在执行的任务的性质而有所不同。

Loom 项目中的“廉价”线程

如上所述,Loom 项目 中的虚拟线程承诺使 Java 线程中的阻塞 Java 代码更“便宜”,这意味着更少的内存和减少 CPU 开销。

Loom 技术中阻塞的虚拟线程将更快更轻松地切换 CPU 核心以在另一个虚拟线程上工作。这些虚拟线程映射到“真实”主机平台 OS 线程,多对一。这些效率意味着即使是数百万个并发线程在传统硬件上也可能是合理的。

我在这里故意过度简化了。有关当前线程技术和 Project Loom 更改的完整详细信息,请参阅最近对 Ron Pressler 和 Project Loom 其他成员的演讲和采访。

你说:

i thought that if i assign 5 threads to my threadpool, then i block 5 cpu cores.

如上所述,阻塞的 Java 线程不会阻塞 CPU 核心。主机 OS 可以随时选择 运行 该核心上的其他线程,无论您的 Java 线程是否被阻塞。 “其他线程”是指 Java 个线程或其他应用程序的线程。所以请记住更大的图景:您的 Java 线程可能会随时暂停任何时间长度,因为主机 OS 认为在该机器上的当前操作条件下合适。

但是,在Java内,如果您使用由五个线程的固定大小线程池支持的执行器服务,有待处理的任务已提交但尚未启动,并且所有当前五个任务恰好都被阻塞,然后在当前阻塞清除之前,该执行程序服务不再执行任何工作。

这是 Loom 项目下虚拟线程的变化:任何阻塞的 虚拟 线程都被 JVM(不是主机 OS ), 这样它的主机 OS 线程就可以立即开始执行共享该“真实”OS 线程的许多其他虚拟线程之一。

我想你正在等待的是 Project Loom:

https://wiki.openjdk.java.net/display/loom/Main

http://cr.openjdk.java.net/~rpressler/loom/loom/sol1_part1.html

https://inside.java/tag/loom

这很快就会进入 JDK,现在可以在 EA(抢先体验)版本中进行测试和反馈。