Java 调用任务集合(与 invokeAll 一样)

Java Invoke Collection of Tasks (as with invokeAll)

我正在寻找一种方法来处理 javafx.concurrent.Taskjava.util.concurrent.CallableExecutorService.invokeAll(Collection<? extends Callable<T>>) 可以做什么 此方法不能用于任务,因为它们是可运行的并且不是可调用对象。

使用 Executors.callable(Runnable task) 将我的任务类型转换为 Callable 无效,因为 returned Callable 将始终 return null 并且我需要值 return来自 Task.call().

我了解 Task 不适用于 Collections。但是我在我的 JavaFX 应用程序中需要一个 Task 并通过编写测试遇到了这个问题。

除了简单地让我的 Task 实现 Callable 之外,还有什么办法可以避免这种情况吗?我不想这样做,因为我会更改我的代码以适应我的测试。

您可以将每个任务包装在一个可调用文件中。本质上,对于每个 Task<T> task,您需要一个 Callable<T>,如下所示:

Callable<T> callable = () -> {
    task.run();
    return task.getValue();
};

因此,例如,您可以定义一个 Function<Task<T>, Callable<T>> 来执行此映射

Function<Task<T>, Callable<T>> taskWrapper = task -> () -> {
    task.run();
    return task.getValue();
};

然后给出 ExecutorService execCollection<Task<T>> tasks 你可以做

List<Future<T>> results = exec.invokeAll(tasks.stream()
    .map(taskWrapper)
    .collect(Collectors.toList()));

这是一个 SSCCE:

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.stream.Collectors;

import javafx.application.Application;
import javafx.concurrent.Task;
import javafx.geometry.Insets;
import javafx.geometry.Pos;
import javafx.scene.Scene;
import javafx.scene.control.Button;
import javafx.scene.control.Label;
import javafx.scene.layout.VBox;
import javafx.stage.Stage;

public class InvokeAllTasks extends Application {

    private Random rng = new Random();
    private ExecutorService exec = Executors.newFixedThreadPool(5);
    private Function<Task<Integer>, Callable<Integer>> taskWrapper = task -> () -> {
        task.run();
        return task.getValue();
    };

    @Override
    public void start(Stage primaryStage) {
        Button runAll = new Button("Run all tasks");
        Label status = new Label();

        runAll.setOnAction(e -> {
            List<Task<Integer>> tasks = createTasks();


            Task<List<Future<Integer>>> runAllTask = new Task<List<Future<Integer>>>() {
                @Override
                protected List<Future<Integer>> call() throws Exception {
                    return exec.invokeAll(tasks.stream().map(taskWrapper).collect(Collectors.toList()));
                }
            };
            status.setText("Running...");
            runAllTask.setOnSucceeded(evt -> status.setText("All Done"));
            new Thread(runAllTask).start();


        });

        VBox root = new VBox(5, runAll, status);
        root.setMinHeight(120);
        root.setAlignment(Pos.CENTER);
        root.setPadding(new Insets(10));

        Scene scene = new Scene(root);
        primaryStage.setScene(scene);
        primaryStage.show();
    }

    @Override
    public void stop() {
        exec.shutdown();
    }

    private List<Task<Integer>> createTasks() {
        List<Task<Integer>> tasks = new ArrayList<>();
        for (int i = 1 ; i <= 8 ; i++) {
            String name = "Task "+i;
            Task<Integer> t = new Task<Integer>() {
                @Override
                protected Integer call() throws Exception {
                    System.out.println(name+" running");
                    Thread.sleep(rng.nextInt(1000)+500);
                    int result = rng.nextInt(500);
                    System.out.println(name+" computed "+result);
                    return result;
                }
            };
            tasks.add(t);
        }
        return tasks;
    }

    public static void main(String[] args) {
        launch(args);
    }
}