使用 Java 8 个流和 CompletableFuture 的并行数据库调用

Parallel database calls using Java 8 streams and CompletableFuture

我想用 Java 8 个流复制和并行化以下行为:

for (animal : animalList) {
        // find all other animals with the same breed
        Collection<Animal> queryResult = queryDatabase(animal.getBreed());

        if (animal.getSpecie() == cat) {
            catList.addAll(queryResult);
        } else {
            dogList.addAll(queryResult);
        }
}

这是我目前所拥有的

final Executor queryExecutor =
        Executors.newFixedThreadPool(Math.min(animalList.size(), 10),
                new ThreadFactory(){
                    public Thread newThread(Runnable r){
                        Thread t = new Thread(r);
                        t.setDaemon(true);
                        return t;
                    }
                });

List<CompletableFuture<Collection<Animal>>> listFutureResult =  animalList.stream()
        .map(animal -> CompletableFuture.supplyAsync(
                () -> queryDatabase(animal.getBreed()), queryExecutor))
        .collect(Collectors.toList());

List<Animal> = listFutureResult.stream()
        .map(CompletableFuture::join)
        .flatMap(subList -> subList.stream())
        .collect(Collectors.toList());

1 - 我不确定如何拆分流以便我可以获得 2 个不同的动物列表,一个用于猫,一个用于狗。

2 - 这个解决方案看起来合理吗?

首先,考虑只使用

List<Animal> result = animalList.parallelStream()
    .flatMap(animal -> queryDatabase(animal.getBreed()).stream())
    .collect(Collectors.toList());

即使它不会为您提供最多 10 个所需的并发。简单性可能会弥补它。至于另一部分,就这么简单

Map<Boolean,List<Animal>> result = animalList.parallelStream()
    .flatMap(animal -> queryDatabase(animal.getBreed()).stream())
    .collect(Collectors.partitioningBy(animal -> animal.getSpecie() == cat));
List<Animal> catList = result.get(true), dogList = result.get(false);

如果您的物种不仅仅是猫和狗,您可以使用 Collectors.groupingBy(Animal::getSpecie) 获取从物种到动物列表的地图。


如果您坚持使用自己的线程池,有几点可以改进:

Executor queryExecutor = Executors.newFixedThreadPool(Math.min(animalList.size(), 10),
    r -> {
        Thread t = new Thread(r);
        t.setDaemon(true);
        return t;
    });
List<Animal> result =  animalList.stream()
    .map(animal -> CompletableFuture.completedFuture(animal.getBreed())
        .thenApplyAsync(breed -> queryDatabase(breed), queryExecutor))
    .collect(Collectors.toList()).stream()
    .flatMap(cf -> cf.join().stream())
    .collect(Collectors.toList());

您的 supplyAsync 变体需要捕获实际的 Animal 实例,为每只动物创建一个新的 Supplier。相反,传递给 thenApplyAsync 的函数是不变的,对每个参数值执行相同的操作。上面的代码假定 getBreed 是一个廉价操作,否则,将 Animal 实例传递给 completedFuture 并使用异步函数执行 getBreed() 并不难.

.map(CompletableFuture::join) 可以替换为 flatMap 函数中的简单链式 .join()。否则,如果您更喜欢方法引用,则应始终如一地使用它们,即 .map(CompletableFuture::join).flatMap(Collection::stream).

当然,这个变体也允许使用 partitioningBy 而不是 toList


最后一点,如果您在使用后在执行程序服务上调用 shutdown,则无需将线程标记为守护进程:

ExecutorService queryExecutor=Executors.newFixedThreadPool(Math.min(animalList.size(),10));
Map<Boolean,List<Animal>> result =  animalList.stream()
    .map(animal -> CompletableFuture.completedFuture(animal.getBreed())
        .thenApplyAsync(breed -> queryDatabase(breed), queryExecutor))
    .collect(Collectors.toList()).stream()
    .flatMap(cf -> cf.join().stream())
    .collect(Collectors.partitioningBy(animal -> animal.getSpecie() == cat));
List<Animal> catList = result.get(true), dogList = result.get(false);
queryExecutor.shutdown();