为什么 CompletableFuture 不 运行 具有 运行Async 的任务?

Why does CompletableFuture not run the task with runAsync?

我想用CompletableFuture做一个测试程序。我有一个具有 2 个功能的 class:

public class FutureTextData {
    private ConcurrentHashMap<String,Integer> map = new ConcurrentHashMap<>();
    private  CompletableFuture<Void> futureForText;

    public void getCharInText(String text){
        futureForText = CompletableFuture.runAsync(() -> {
            for (int i = 0; i < text.length()-3; i++) {
                map.compute(text.substring(i+1),(key,value) -> value+=1);
                map.compute(text.substring(i+2),(key,value) -> value+=1);
                map.compute(text.substring(i+3),(key,value) -> value+=1);
            }
            for(Map.Entry<String ,Integer> entry:map.entrySet()){
                if(entry.getKey().length()==3)
                    System.out.println(entry.getKey());

            }

        });
    }

    public void recordCharInText(String outPutFile){
        /*try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }*/
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            File file = new File(outPutFile);

            try(BufferedWriter bf = new BufferedWriter(new FileWriter(file))){
                for(Map.Entry<String ,Integer> entry:map.entrySet()){
                    bf.write(entry.getKey() +"<----->" + entry.getValue());

                }

            }catch (IOException e) {
                e.printStackTrace();
            }
        });
    }
}

getCharInText()中,我想统计文本中某些子串的个数,在recordCharInText()中,我想记录Map的当前状态。

当我 运行 程序时:

FutureTextData futureTextData = new FutureTextData();
futureTextData.getCharInText(result);
futureTextData.recordCharInText("outFile.txt");

然后一切都顺利完成,没有任何错误。即地图没有写入文件,甚至 getCharInText() 也没有执行。

你能告诉我错误是什么吗?

运行您的任务的 ForkJoinPool CommonPool 将在程序退出时关闭:

    /**
     * Returns the common pool instance. This pool is statically
     * constructed; its run state is unaffected by attempts to {@link
     * #shutdown} or {@link #shutdownNow}. However this pool and any
     * ongoing processing are automatically terminated upon program
     * {@link System#exit}.  Any program that relies on asynchronous
     * task processing to complete before program termination should
     * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
     * before exit.
     *
     * @return the common pool instance
     * @since 1.8
     */
    public static ForkJoinPool commonPool() {
        // assert common != null : "static init error";
        return common;
    }

如果我修改您的代码以添加 awaitQuiescence 并记录我得到的任务的执行情况:

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.*;

public class FutureTextData {
    private ConcurrentHashMap<String,Integer> map = new ConcurrentHashMap<>();
    public  CompletableFuture<Void> futureForText;

    public void getCharInText(String text){
        futureForText = CompletableFuture.runAsync(() -> {
            System.out.println("Running first task");
            map.put("foo", 1);
        });
    }

    public void recordCharInText(String outPutFile){
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            System.out.println("Running second task");

            File file = new File(outPutFile);

            try(BufferedWriter bf = new BufferedWriter(new FileWriter(file))){
                for(Map.Entry<String ,Integer> entry:map.entrySet()){
                    bf.write(entry.getKey() +"<----->" + entry.getValue());

                }

            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTextData futureTextData = new FutureTextData();
        futureTextData.getCharInText("xyz");
        futureTextData.recordCharInText("outFile.txt");
        ForkJoinPool.commonPool().awaitQuiescence(1000, TimeUnit.MILLISECONDS);
    }
}

产生:

Running first task
Running second task

Process finished with exit code 0

您打算计算地图并将结果写入文件:

FutureTextData futureTextData = new FutureTextData();
futureTextData.getCharInText(result); // populate map
futureTextData.recordCharInText("outFile.txt"); // write values to a file

  • 当前代码在两个不同的线程上同时执行两个操作(计算 map 和写入文件)。但是,您只想在 getCharInText 中的 CF 完成后才将 map 写入文件。
  • 默认情况下,CF 使用 ForkJoinPool CommonPool,它会在您的程序退出时终止。 main 线程不会等待 getCharInTextrecordCharInText 中的线程完成。它立即调用方法和 returns。

代码应如下所示:

public class FutureTextData {

    public CompletableFuture<Map<String, Integer>> getCharInText(String text) {
        return CompletableFuture.supplyAsync(() -> {
            var map = new HashMap<String, Integer>();
            
            for (int i = 0; i < text.length() - 3; i++) {
                // For this case, javadoc suggests to use merge instead of compute
                
               /* map.compute(text.substring(i+1),(key,value) -> value+=1);
                map.compute(text.substring(i+2),(key,value) -> value+=1);
                map.compute(text.substring(i+3),(key,value) -> value+=1);*/
                
                map.merge(text.substring(i + 1), 1, Integer::sum);
                map.merge(text.substring(i + 2), 1, Integer::sum);
                map.merge(text.substring(i + 3), 1, Integer::sum);
            }

            for (Map.Entry<String, Integer> entry : map.entrySet()) {
                if (entry.getKey().length() == 3)
                    System.out.println(entry.getKey());
            }

            return map;
        });
    }

    public void recordCharInText(String outPutFile, Map<String, Integer> map) {
       // write to file logic
    }

    public static void main(String[] args) throws InterruptedException {
        FutureTextData futureTextData = new FutureTextData();
        futureTextData.getCharInText("soeerrt text")
                       // supply the map to next execution step
                      .thenAccept(map -> futureTextData.recordCharInText("outFile.txt", map));

        // wait for the tasks to finish
        ForkJoinPool.commonPool().awaitTermination(5, TimeUnit.SECONDS);
    }

注:

  • 我把 map 移到了 getCharInText 里面。始终尽量避免 lambda 内部的突变。
  • CF 的 lambda 可以是一种方法。 Lambda 应该最多 2 行