CompletableFuture 连同使用 FileReader 读取,程序不会退出

CompletableFuture along with reading using FileReader, the program doesn't exit

背景

构建一个数据管道,其中接收到的每条消息都将被异步处理。 试图通过

模拟行为

代码

 BufferedReader reader = null;
 ExecutorService service = Executors.newFixedThreadPool(4);
 try {
   String filepath = str[0];
   FileReaderAsync fileReaderAsync = new FileReaderAsync();
   reader = new BufferedReader(new FileReader(filepath));
   Random r = new Random();
   String line; 
   while ((line = reader.readLine()) != null) {
          Integer val = Integer.valueOf(line.trim());
          int randomInt = r.nextInt(5);
          Thread.sleep(randomInt * 100);
          CompletableFuture.supplyAsync(() -> {
              System.out.println("Square : " + val);
              return val * val;
          }, service) 
             .thenApplyAsync(value -> {
                  System.out.println(":::::::Double : " + value);
                  return 2 * value;
              }, service)
              .thenAccept(value -> {
                   System.out.println("Answer : " + value);
              });
   }
 } catch (Exception e) {
     e.printStackTrace();
 } finally {
     try {
          reader.close();
     } catch (Exception e) {
          throw new RuntimeException(e.getMessage());
     }
 }

为简单起见,仅粘贴主要方法代码,假设变量已声明并在范围内。

问题

代码

设计

编辑 1 已添加

public void shutdown() {
   service.shutdown();
}

  reader.close();
  fileReaderAsync.shutdown();

成功了。

池中有 4 个线程,但 Thread.sleep() 会阻塞主线程。您的程序读取一行,最大块。 5 秒,然后将触发异步代码,它根本不需要任何异步性,实际上会产生巨大的开销。

不要在异步程序中使用 Thread.sleep()。

但我试图了解你的代码,我可以提供这个:

public int calcWork(final int x) {
    return x*x;
}

public void iter_async_rec(final BufferedReader reader) {
    String line = reader.readline();
    if (line != null) {     
         int i = Integer.tryParse(line); // checks required       
         CompetableFuture.supplyAsync(calcWork(i))
         .thenSupplyAsync(i->System.out.println(i))
         .thenRunAsync(()->iter_asnc_rec(reader))
    }
}

另外:大多数时候只使用标准执行器是最好的选择。相反,给定的样本不会提高速度。

也许看看响应式的想法!? reactivejava

问题

您正在使用由以下人员创建的线程池:

ExecutorService service = Executors.newFixedThreadPool(4);

默认配置为使用非守护进程线程。正如 java.lang.Thread 所记录的那样:

When a Java Virtual Machine starts up, there is usually a single non-daemon thread (which typically calls the method named main of some designated class). The Java Virtual Machine continues to execute threads until either of the following occurs:

  • The exit method of class Runtime has been called and the security manager has permitted the exit operation to take place.
  • All threads that are not daemon threads have died, either by returning from the call to the run method or by throwing an exception that propagates beyond the run method.

换句话说,任何仍处于活动状态的非守护进程线程也会使 JVM 保持活动状态。


解决方案

您的问题至少有两种解决方案。

关闭线程池

您可以在使用完后关闭线程池。

service.shutdown(); // Calls ExecutorService#shutdown()

#shutdown() 方法启动正常关机。它阻止提交任何新任务,但允许完成任何已提交的任务。一旦所有任务都完成,池将终止(即所有线程都将被允许死亡)。如果您想等待所有任务完成后再继续,那么您可以在调用 #shutdown() / #shutdownNow().

之后调用 #awaitTermination(long,TimeUnit)

如果您想立即关闭池,请致电 #shutdownNow()。任何当前正在执行的任务都将被取消,任何已提交但尚未开始的任务都不会执行(实际上会在列表中返回给您)。请注意任务是否响应取消取决于该任务的实施方式。

使用守护线程

A daemon 线程不会使 JVM 保持活动状态。您可以通过 ThreadFactory.

配置线程池以使用 daemon 线程
ExecutorService service = Executors.newFixedThreadPool(4, r -> {
  Thread t = new Thread(r); // may want to name the threads
  t.setDaemon(true);
  return t;
});

请注意,无论如何,您仍应在完成线程池后将其关闭。