如何每次使用 CompletableFuture 的 supplyAsync 运行 具有多个输入的相同方法?

How to use supplyAsync of CompletableFuture to run the same method with multiple inputs each time?

我有以下代码,我在其中创建供应商并使用 completableFuture 的 supplyAsync 方法在异步执行后调用另一个方法。

public void runParallelFunctions(MyInput myInput) {
    Supplier<Map<String, String>> taskSupplier = () -> {
        try {
            return invokeLambda("input1");
        } catch (Exception e) {
            System.out.println(e);
        }
        return new HashMap<>();
    };
    
    for (int i = 0; i < 5; i++) {
        CompletableFuture.supplyAsync(taskSupplier::get, executorService)
                         .thenAccept(this::printResultsFromParallelInvocations);
    }
    System.out.println("Doing other work....");
}

下面是我执行完成后调用的方法

private void printResultsFromParallelInvocations(Map<String, String> result) {
        result.forEach((key, value) -> System.out.println(key + ": " + value));
}

在上面的代码中,如何调用方法 invokeLambda 传递多个参数,如“input1”、“input2”等?我可以通过循环生成输入,但我如何使用供应商的某种列表,以便我可以为 supplyAsync 方法调用整个列表?我无法使用 runAsync 方法,因为我有一个 return 值需要调用 printResultsFromParallelInvocations。我是期货和异步回调的新手,非常感谢任何帮助。提前致谢。

您可以在循环内即时创建新的 Supplier。

public static Supplier<Map<String, String>> supplierFunc(Object... args) {
    return () -> {
        try {
            return invokeLambda(args);
        } catch (Exception e) {
            System.out.println(e);
        }
        return new HashMap<>();
    };
}

public void runParallelFunctions(Object myInput) {
    for (int i = 0; i < 5; i++) {
        CompletableFuture.supplyAsync(supplierFunc("input1", "input2"), executorService)
                .thenAccept(this::printResultsFromParallelInvocations);
    }
    System.out.println("Doing other work....");
}

您不能创建一个 Supplier<Map<String, String>> 并期望它在五次评估中表现不同。它需要外部可变状态才能检测到评估是第 n 个评估,同时与执行五个 concurrent 无顺序评估的想法相矛盾。

只需创建五个不同的供应商,例如

for(int i = 0; i < 5; i++) {
    String input = "input" + i;
    CompletableFuture.supplyAsync(() -> invokeLambda(input), executorService)
        .thenAccept(this::printResultsFromParallelInvocations);
}

在每次循环迭代中,lambda 表达式 () -> invokeLambda(input) 捕获 input 的当前值并创建适当的 Supplier 实例。

旁注:

  • 不要像 invokeLambda 那样在技术方面命名方法,而是尝试表达它们的 目的 .

  • 原始代码中的 taskSupplier::get 是一个不必要的方法引用,因为它产生了 Supplier 调用已经是 Supplier 的对象上的方法。因此,如果希望每次评估都获得相同的行为,taskSupplier 可以直接传递给 supplyAsync