为什么 CompletableFuture 不 运行 具有 运行Async 的任务?
Why does CompletableFuture not run the task with runAsync?
我想用CompletableFuture
做一个测试程序。我有一个具有 2 个功能的 class:
public class FutureTextData {
private ConcurrentHashMap<String,Integer> map = new ConcurrentHashMap<>();
private CompletableFuture<Void> futureForText;
public void getCharInText(String text){
futureForText = CompletableFuture.runAsync(() -> {
for (int i = 0; i < text.length()-3; i++) {
map.compute(text.substring(i+1),(key,value) -> value+=1);
map.compute(text.substring(i+2),(key,value) -> value+=1);
map.compute(text.substring(i+3),(key,value) -> value+=1);
}
for(Map.Entry<String ,Integer> entry:map.entrySet()){
if(entry.getKey().length()==3)
System.out.println(entry.getKey());
}
});
}
public void recordCharInText(String outPutFile){
/*try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}*/
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
File file = new File(outPutFile);
try(BufferedWriter bf = new BufferedWriter(new FileWriter(file))){
for(Map.Entry<String ,Integer> entry:map.entrySet()){
bf.write(entry.getKey() +"<----->" + entry.getValue());
}
}catch (IOException e) {
e.printStackTrace();
}
});
}
}
在getCharInText()
中,我想统计文本中某些子串的个数,在recordCharInText()
中,我想记录Map的当前状态。
当我 运行 程序时:
FutureTextData futureTextData = new FutureTextData();
futureTextData.getCharInText(result);
futureTextData.recordCharInText("outFile.txt");
然后一切都顺利完成,没有任何错误。即地图没有写入文件,甚至 getCharInText()
也没有执行。
你能告诉我错误是什么吗?
运行您的任务的 ForkJoinPool
CommonPool
将在程序退出时关闭:
/**
* Returns the common pool instance. This pool is statically
* constructed; its run state is unaffected by attempts to {@link
* #shutdown} or {@link #shutdownNow}. However this pool and any
* ongoing processing are automatically terminated upon program
* {@link System#exit}. Any program that relies on asynchronous
* task processing to complete before program termination should
* invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
* before exit.
*
* @return the common pool instance
* @since 1.8
*/
public static ForkJoinPool commonPool() {
// assert common != null : "static init error";
return common;
}
如果我修改您的代码以添加 awaitQuiescence
并记录我得到的任务的执行情况:
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.*;
public class FutureTextData {
private ConcurrentHashMap<String,Integer> map = new ConcurrentHashMap<>();
public CompletableFuture<Void> futureForText;
public void getCharInText(String text){
futureForText = CompletableFuture.runAsync(() -> {
System.out.println("Running first task");
map.put("foo", 1);
});
}
public void recordCharInText(String outPutFile){
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("Running second task");
File file = new File(outPutFile);
try(BufferedWriter bf = new BufferedWriter(new FileWriter(file))){
for(Map.Entry<String ,Integer> entry:map.entrySet()){
bf.write(entry.getKey() +"<----->" + entry.getValue());
}
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTextData futureTextData = new FutureTextData();
futureTextData.getCharInText("xyz");
futureTextData.recordCharInText("outFile.txt");
ForkJoinPool.commonPool().awaitQuiescence(1000, TimeUnit.MILLISECONDS);
}
}
产生:
Running first task
Running second task
Process finished with exit code 0
您打算计算地图并将结果写入文件:
FutureTextData futureTextData = new FutureTextData();
futureTextData.getCharInText(result); // populate map
futureTextData.recordCharInText("outFile.txt"); // write values to a file
- 当前代码在两个不同的线程上同时执行两个操作(计算
map
和写入文件)。但是,您只想在 getCharInText
中的 CF 完成后才将 map
写入文件。
- 默认情况下,CF 使用
ForkJoinPool CommonPool
,它会在您的程序退出时终止。 main
线程不会等待 getCharInText
和 recordCharInText
中的线程完成。它立即调用方法和 returns。
代码应如下所示:
public class FutureTextData {
public CompletableFuture<Map<String, Integer>> getCharInText(String text) {
return CompletableFuture.supplyAsync(() -> {
var map = new HashMap<String, Integer>();
for (int i = 0; i < text.length() - 3; i++) {
// For this case, javadoc suggests to use merge instead of compute
/* map.compute(text.substring(i+1),(key,value) -> value+=1);
map.compute(text.substring(i+2),(key,value) -> value+=1);
map.compute(text.substring(i+3),(key,value) -> value+=1);*/
map.merge(text.substring(i + 1), 1, Integer::sum);
map.merge(text.substring(i + 2), 1, Integer::sum);
map.merge(text.substring(i + 3), 1, Integer::sum);
}
for (Map.Entry<String, Integer> entry : map.entrySet()) {
if (entry.getKey().length() == 3)
System.out.println(entry.getKey());
}
return map;
});
}
public void recordCharInText(String outPutFile, Map<String, Integer> map) {
// write to file logic
}
public static void main(String[] args) throws InterruptedException {
FutureTextData futureTextData = new FutureTextData();
futureTextData.getCharInText("soeerrt text")
// supply the map to next execution step
.thenAccept(map -> futureTextData.recordCharInText("outFile.txt", map));
// wait for the tasks to finish
ForkJoinPool.commonPool().awaitTermination(5, TimeUnit.SECONDS);
}
注:
- 我把
map
移到了 getCharInText
里面。始终尽量避免 lambda 内部的突变。
- CF 的 lambda 可以是一种方法。 Lambda 应该最多 2 行
我想用CompletableFuture
做一个测试程序。我有一个具有 2 个功能的 class:
public class FutureTextData {
private ConcurrentHashMap<String,Integer> map = new ConcurrentHashMap<>();
private CompletableFuture<Void> futureForText;
public void getCharInText(String text){
futureForText = CompletableFuture.runAsync(() -> {
for (int i = 0; i < text.length()-3; i++) {
map.compute(text.substring(i+1),(key,value) -> value+=1);
map.compute(text.substring(i+2),(key,value) -> value+=1);
map.compute(text.substring(i+3),(key,value) -> value+=1);
}
for(Map.Entry<String ,Integer> entry:map.entrySet()){
if(entry.getKey().length()==3)
System.out.println(entry.getKey());
}
});
}
public void recordCharInText(String outPutFile){
/*try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}*/
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
File file = new File(outPutFile);
try(BufferedWriter bf = new BufferedWriter(new FileWriter(file))){
for(Map.Entry<String ,Integer> entry:map.entrySet()){
bf.write(entry.getKey() +"<----->" + entry.getValue());
}
}catch (IOException e) {
e.printStackTrace();
}
});
}
}
在getCharInText()
中,我想统计文本中某些子串的个数,在recordCharInText()
中,我想记录Map的当前状态。
当我 运行 程序时:
FutureTextData futureTextData = new FutureTextData();
futureTextData.getCharInText(result);
futureTextData.recordCharInText("outFile.txt");
然后一切都顺利完成,没有任何错误。即地图没有写入文件,甚至 getCharInText()
也没有执行。
你能告诉我错误是什么吗?
运行您的任务的 ForkJoinPool
CommonPool
将在程序退出时关闭:
/**
* Returns the common pool instance. This pool is statically
* constructed; its run state is unaffected by attempts to {@link
* #shutdown} or {@link #shutdownNow}. However this pool and any
* ongoing processing are automatically terminated upon program
* {@link System#exit}. Any program that relies on asynchronous
* task processing to complete before program termination should
* invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
* before exit.
*
* @return the common pool instance
* @since 1.8
*/
public static ForkJoinPool commonPool() {
// assert common != null : "static init error";
return common;
}
如果我修改您的代码以添加 awaitQuiescence
并记录我得到的任务的执行情况:
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.*;
public class FutureTextData {
private ConcurrentHashMap<String,Integer> map = new ConcurrentHashMap<>();
public CompletableFuture<Void> futureForText;
public void getCharInText(String text){
futureForText = CompletableFuture.runAsync(() -> {
System.out.println("Running first task");
map.put("foo", 1);
});
}
public void recordCharInText(String outPutFile){
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("Running second task");
File file = new File(outPutFile);
try(BufferedWriter bf = new BufferedWriter(new FileWriter(file))){
for(Map.Entry<String ,Integer> entry:map.entrySet()){
bf.write(entry.getKey() +"<----->" + entry.getValue());
}
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTextData futureTextData = new FutureTextData();
futureTextData.getCharInText("xyz");
futureTextData.recordCharInText("outFile.txt");
ForkJoinPool.commonPool().awaitQuiescence(1000, TimeUnit.MILLISECONDS);
}
}
产生:
Running first task
Running second task
Process finished with exit code 0
您打算计算地图并将结果写入文件:
FutureTextData futureTextData = new FutureTextData();
futureTextData.getCharInText(result); // populate map
futureTextData.recordCharInText("outFile.txt"); // write values to a file
- 当前代码在两个不同的线程上同时执行两个操作(计算
map
和写入文件)。但是,您只想在getCharInText
中的 CF 完成后才将map
写入文件。 - 默认情况下,CF 使用
ForkJoinPool CommonPool
,它会在您的程序退出时终止。main
线程不会等待getCharInText
和recordCharInText
中的线程完成。它立即调用方法和 returns。
代码应如下所示:
public class FutureTextData {
public CompletableFuture<Map<String, Integer>> getCharInText(String text) {
return CompletableFuture.supplyAsync(() -> {
var map = new HashMap<String, Integer>();
for (int i = 0; i < text.length() - 3; i++) {
// For this case, javadoc suggests to use merge instead of compute
/* map.compute(text.substring(i+1),(key,value) -> value+=1);
map.compute(text.substring(i+2),(key,value) -> value+=1);
map.compute(text.substring(i+3),(key,value) -> value+=1);*/
map.merge(text.substring(i + 1), 1, Integer::sum);
map.merge(text.substring(i + 2), 1, Integer::sum);
map.merge(text.substring(i + 3), 1, Integer::sum);
}
for (Map.Entry<String, Integer> entry : map.entrySet()) {
if (entry.getKey().length() == 3)
System.out.println(entry.getKey());
}
return map;
});
}
public void recordCharInText(String outPutFile, Map<String, Integer> map) {
// write to file logic
}
public static void main(String[] args) throws InterruptedException {
FutureTextData futureTextData = new FutureTextData();
futureTextData.getCharInText("soeerrt text")
// supply the map to next execution step
.thenAccept(map -> futureTextData.recordCharInText("outFile.txt", map));
// wait for the tasks to finish
ForkJoinPool.commonPool().awaitTermination(5, TimeUnit.SECONDS);
}
注:
- 我把
map
移到了getCharInText
里面。始终尽量避免 lambda 内部的突变。 - CF 的 lambda 可以是一种方法。 Lambda 应该最多 2 行