Java 8 可完成的未来

Java 8 Completable Future

我的问题是如何使用 Completable Future。

我有一个 class 实现了 Callable。

public class Executor implements Callable<Collection>

前面是用来做的-

service.submit(collectorService);

哪个会 return 一个 Future<Collection>。但是我们不想再使用 future 并且需要 CompletableFuture 。一种想法是我们不需要使用 CompletableFuture 进行轮询,我们也不必等待和阻塞直到它准备好。

那么当 callable 线程完成时,我将如何使用可完成的未来并调用一个函数说 isDone()

给定一个 CompletableFuture<T> f,您可以在 运行 完成后启动一个同步或异步任务:

f.thenApply(result -> isDone(result));      // sync callback
f.thenApplyAsync(result -> isDone(result)); // async callback

...或者,如果您不需要结果:

f.thenRun(() -> isDone());
f.thenRunAsync(() -> isDone());

如果我没理解错的话,你想知道如何提交返回 CompletableFuture 的 "task"(你之前的 "Executor")。

您可以通过调用

CompletableFuture.supplyAsync(collectorService)

区别在于您的 "Executor" 现在必须实施 Supplier 而不是 Callable

您可以创建调用现有 collectorService 的 lambda 表达式。 CompletableFuture.supplyAsync 将接受的 Supplier lambda 表达式看起来像这样

 Supplier<Collection> supplier = () -> collectorService.call();

并且可以与 CompletableFuture 一起使用,如下所示

  CompletableFuture.supplyAsync(() -> collectorService.call(),service)
         .thenApply(collection->isDone(collection);

正如其他人指出的那样,thenApply 将在 collectorService.call() 方法 returns 结果时执行 - 在执行我们的 Future 任务的同一线程上。使用 thenApplyAsync 会将另一个任务重新提交给执行程序服务(原始性能大约慢一个数量级,所以除非你也有充分的理由,否则不要这样做!)。

我们不在 completableFuture 中传递 runnable 或 callable。它采用作为功能接口的供应商类型。只需创建普通方法并将它们与执行者的对象一起传递。作为参考,请考虑以下示例。

package completableFuture;

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompFuture {
    ExecutorService firstExecService = Executors.newFixedThreadPool(5);

    public static void main(String[] args) {

        CompFuture compFuture = new CompFuture();
        compFuture.testMe("Java");
    }

    public String m1(String param) {

        Random r = new Random();
        int val = r.nextInt(20) * 1000;
        System.out.println(Thread.currentThread().getName() + " " + val);

        try {
            Thread.sleep(val);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return param + " Hello my";
    }

    public void m2(String salutation) {
        System.out.println(Thread.currentThread().getName() + "  ##" + salutation + " Friend!");
    }

    public void testMe(String start) {
        System.out.println("TM: " + Thread.currentThread());

        for (int i = 0; i < 5; i++) {
            CompletableFuture.supplyAsync(() -> m1(start), firstExecService).thenAccept(s -> m2(s));
        }
    }

}

以上程序的输出:: 执行时间最短的线程首先给出其输出。

TM: 线程[main,5,main]

pool-1-thread-1 1000

池 1-线程 2 14000

pool-1-thread-4 3000

池 1-线程 3 0

pool-1-thread-5 9000

pool-1-thread-3 ##Java你好我的朋友!

pool-1-thread-1 ##Java你好我的朋友!

pool-1-thread-4 ##Java你好我的朋友!

pool-1-thread-5 ##Java你好我的朋友!

pool-1-thread-2 ##Java你好我的朋友!