Java 运行 使用 CompletableFuture 循环

Java run loop with CompletableFuture

我正在尝试使用 CompletableFuture 并行执行 for 循环。在循环中,我使用 supplyAsync 调用 doSomething 来获取输出字符串,然后将其放入 HashMap:

    ...
    ConcurrentHashMap<Integer, String> map = new ConcurrentHashMap<>();
    CompletableFuture<?> completableFuture = null;

    for ( int i = 0; i < numberOfRecords; i++ ) {
        final int finalI = i;
        completableFuture = CompletableFuture
                .supplyAsync( () -> doSomething( data, finalI ) )
                .thenAccept( str -> map.put( finalI, str ) );
    }
    completableFuture.join();

private String doSomething(HashMap<String, String> data, int finalI ) ) {
    ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();

    for ( int k = 0; k < data.size(); k++ ) {
    //process data and add it in queue
    }
    String result = processQueueAndReturnString(queue);
    return result;
 

问题是当 for 循环几乎完成时(当 i 接近 numberOfRecords 时),doSomething 方法中的另一个 for 循环跳过一些迭代,例如如果 k=5 它只能 运行 循环直到 k=2 or 3,在这种情况下 supplyAsync( () -> doSomething( data, finalI ) ) returns null。所以看起来我的 for 循环 CompletableFuture 完成,直到一些迭代完全完成。

关于如何解决这个问题的任何建议或提示?

So it seems like my for loop with CompletableFuture finishes [before] some iterations are completely done.

示例代码中的每个循环迭代都会创建一个 CompletableFuture。如果要等待所有工作完成,则需要加入 所有 个,而不仅仅是上次迭代创建的那个。

像这样(样式已更正!):

ConcurrentHashMap<Integer, String> map = new ConcurrentHashMap<>();
CompletableFuture<Void>[] futures = new CompletableFuture<Void>[nosRecords];

for (int i = 0; i < nosRecords; i++) {
    final int finalI = i;
    futures[i] = CompletableFuture
            .supplyAsync(() -> doSomething(data, finalI))
            .thenAccept(str -> map.put(finalI, str));
}
CompletableFuture.allOf(futures);

请注意,您需要将 CompletableFuture<?> 更改为 CompletableFuture<Void>,因为 allOf() (javadoc) 的声明需要这样做。幸运的是,thenAccept(...) 调用已经返回 CompletableFuture<Void>.


The HashMap data is not thread-safe, should it be? I just use it in method doSomething to get entry value based on index finalI. I don't process that HashMap. I just read it.

supplyAsync 调用和对其 lambda 参数的调用之间,发生在 之前。因此,只要 data 在任何 doSomething 调用的执行过程中都没有改变,它们都会在 data 映射中看到正确的值。

假设事情如您所说(并保持这种状态),可以在那里使用非同步 HashMap

看起来是正确的,适合今天的 Java。但在未来(啊哈,看看我在那里做了什么?),Java 可能会提供一种更简单、更快的方法,使用虚拟线程。

Loom 项目

Project Loom is coming to Java, with preliminary builds available now 基于抢先体验 Java 16.

一个主要特性是虚拟线程 (fibers)。这些是轻量级线程。当任何虚拟线程中的控制流阻塞时,Java 检测到阻塞并切换到另一个虚拟线程以保持 CPU 核心忙碌。这可以大大加快经常阻塞的线程代码(与严格 CPU 绑定的任务(如视频编码)相反)。

请注意,根据 Ron Pressler(Loom 项目的工作人员之一)的说法,CompletableFuture 上的许多方法中的大多数都因虚拟线程而消失了。除了调用 get,您可能不会做更多的事情。查看他的演示文稿,最新的是 2020-11-11, 2020-09-17, and 2020-07-28.

虽然我没有捕捉到您业务逻辑的所有细微差别,但我想我已经掌握了要点。与 Stephen C 类似,我收集了所有返回的 CompletableFuture 对象。然后我检查它们是否成功完成。

在 Loom 项目中,ExecutorService is now AutoCloseable。所以我们可以使用 try-with-resources 语法。 try-block 的末尾将阻塞,直到完成所有提交的任务。这种自然阻塞取代了 Stephen C.

在解决方案中看到的 CompletableFuture.allOf(futures);

示例代码

这是我们任务的一个class,一个Callable那个returns一个UUID对象。我们还让每个任务休眠一秒钟,以演示一个冗长的任务。我们的任务还将其结果记录在我们传递给其构造函数的 ConcurrentMap 中。

package work.basil.example;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.*;

public class DoSomething implements Callable < UUID >
{
    private Integer identifier;
    private ConcurrentMap < Integer, UUID > results;

    // Constructor
    public DoSomething ( Integer identifier , ConcurrentMap < Integer, UUID > resultsMap )
    {
        this.identifier = identifier;
        this.results = resultsMap;
    }

    @Override
    public UUID call ( ) throws Exception
    {
        Thread.sleep( Duration.ofSeconds( 1 ) );
        UUID uuid = UUID.randomUUID();
        this.results.put( this.identifier , uuid );
        return uuid;
    }
}

这是实例化代码和 运行 一堆任务。

    public static void main ( String[] args )
    {
        System.out.println( "INFO - Java version: " + Runtime.version() );
        System.out.println( "INFO - Host OS: " + System.getProperty( "os.name" ) + " version " + System.getProperty( "os.version" ) );
        System.out.println( "INFO - arch: " + System.getProperty( "os.arch" ) + " | Available processors (cores): " + Runtime.getRuntime().availableProcessors() );
        long maxMemory = Runtime.getRuntime().maxMemory();
        System.out.println( "INFO - Maximum memory (bytes): " + String.format( Locale.getDefault() , "%,d" , ( maxMemory == Long.MAX_VALUE ? "no limit" : maxMemory ) ) );
        System.out.println( "----------------------------------------------" );
        long start = System.nanoTime();

        ConcurrentMap < Integer, UUID > results = new ConcurrentSkipListMap <>();
        int countTasks = 1_000_000;
        System.out.println( "INFO - Starting a run of " + countTasks + ". " + Instant.now() );
        List < CompletableFuture < UUID > > futures = new ArrayList <>( countTasks );
        try (
                ExecutorService executorService = Executors.newVirtualThreadExecutor() ;
        )
        {
            for ( int nthTask = 0 ; nthTask < countTasks ; nthTask++ )
            {
                executorService.submit( new DoSomething( nthTask , results ) );
            }
        }
        // At this point, flow-of-control blocks until all submitted tasks finish (are done, or are cancelled).
        List < CompletableFuture < UUID > > canceled = new ArrayList <>();
        List < CompletableFuture < UUID > > completedExceptionally = new ArrayList <>();
        for ( CompletableFuture < UUID > future : futures )
        {
            if ( future.isCancelled() )
            {
                canceled.add( future );
            } else if ( future.isCompletedExceptionally() )
            {
                completedExceptionally.add( future );
            } else if ( ! future.isDone() )
            {
                throw new IllegalStateException( "All tasks should be done at this point, normally or interrupted." );
            } else
            {
                throw new IllegalStateException( "Should not be able to reach this point." );
            }
        }

        Duration duration = Duration.ofNanos( System.nanoTime() - start );
        System.out.println( "Done at " + Instant.now() + ". Took: " + duration );
        System.out.println( "Problems… canceled size: " + canceled.size() + " | completedExceptionally size: " + completedExceptionally.size() );
        System.out.println( "Results size = " + String.format( Locale.getDefault() , "%,d" , results.size() ) );
    }
INFO - Java version: 16-loom+9-316
INFO - Host OS: Mac OS X version 10.14.6 
INFO - arch: x86_64 | Available processors (cores): 6
INFO - Maximum memory (bytes): 8,589,934,592
----------------------------------------------
INFO - Starting a run of 10000000. 2021-01-01T05:40:28.564019Z
Done at 2021-01-01T05:41:11.567852Z. Took: PT43.006895236S
Problems… canceled size: 0 | completedExceptionally size: 0
Results size = 10,000,000

运行 其中一百万个任务需要几秒钟。 运行一千万只用不到一分钟

所以你可以看到被阻塞的线程休眠一秒钟显然没有占用核心时间。如果他们在核心上花费时间,我们将等待很长时间:10,000,000 个任务 * 每个任务 1 秒 / 6 个核心 = 1,666,666 秒 = 462 小时。