后台线程中的可编辑任务队列 运行

Editable queue of tasks running in background thread

我知道这个问题已经回答了很多次,但我很难理解它是如何工作的。

所以在我的应用程序中,用户必须能够 select 项,这些项将被添加到队列中(使用 ObservableList<Task>ListView 中显示)并且每个项都需要由 ExecutorService 顺序处理。

此外,该队列应该是可编辑的(更改顺序并从列表中删除项目)。

private void handleItemClicked(MouseEvent event) {
    if (event.getClickCount() == 2) {
        File item = listView.getSelectionModel().getSelectedItem();
        Task<Void> task = createTask(item);
        facade.getTaskQueueList().add(task); // this list is bound to a ListView, where it can be edited
        Future result = executor.submit(task); 
        // where executor is an ExecutorService of which type?

        try {
            result.get();
        } catch (Exception e) {
            // ...
        }
    }
}

executor = Executors.newFixedThreadPool(1) 尝试过,但我无法控制队列。
我阅读了 ThreadPoolExecutor 和队列,但我很难理解它,因为我对并发还很陌生。

我需要在后台线程中运行那个方法handleItemClicked,这样UI就不会冻结,最好的方法是什么?

总结:如何实现任务队列,由后台线程编辑和顺序处理?

请帮我想想办法

编辑 使用 vanOekel 的 SerialTaskQueue class 帮助了我,现在我想将任务列表绑定到我的 ListView.

ListProperty<Runnable> listProperty = new SimpleListProperty<>();
listProperty.set(taskQueue.getTaskList()); // getTaskList() returns the LinkedList from SerialTaskQueue
queueListView.itemsProperty().bind(listProperty); 

显然这不起作用,因为它需要一个 ObservableList。有什么优雅的方法吗?

我能想到的最简单的解决方案是在执行程序外部维护任务列表,并使用回调向执行程序提供下一个任务(如果可用)。不幸的是,它涉及任务列表上的同步和一个 AtomicBoolean 来指示正在执行的任务。

回调只是一个 Runnable 将原始任务包装到 运行 然后 "calls back" 以查看是否有另一个任务要执行,如果有,则使用(后台)执行者。

需要同步才能使任务列表保持有序并处于已知状态。任务列表可以同时被两个线程修改:通过执行者(后台)线程中的回调 运行ning 和通过 UI 前台线程执行的 handleItemClicked 方法。这反过来意味着永远无法准确知道任务列表何时为空。为了使任务列表保持有序并处于已知的固定状态,需要同步任务列表。

这仍然留下了一个模棱两可的时刻来决定任务何时准备好执行。这就是 AtomicBoolean 的用武之地:一个值集总是立即可用并被任何其他线程读取,而 compareAndSet 方法将始终确保只有一个线程获得 "OK".

结合同步和 AtomicBoolean 的使用允许创建一个带有 "critical section" 的方法,可以同时被前台和后台线程调用以触发如果可能,执行新任务。以下代码的设计和设置方式使得可以存在一种这样的方法 (runNextTask)。最好使并发代码中的 "critical section" 尽可能简单明了(这反过来通常会导致高效的 "critical section")。

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

public class SerialTaskQueue {

    public static void main(String[] args) {

        ExecutorService executor = Executors.newSingleThreadExecutor();
        // all operations on this list must be synchronized on the list itself.
        SerialTaskQueue tq = new SerialTaskQueue(executor);
        try {
            // test running the tasks one by one
            tq.add(new SleepSome(10L));
            Thread.sleep(5L);
            tq.add(new SleepSome(20L));
            tq.add(new SleepSome(30L));

            Thread.sleep(100L);
            System.out.println("Queue size: " + tq.size()); // should be empty
            tq.add(new SleepSome(10L));

            Thread.sleep(100L);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            executor.shutdownNow();
        }
    }

    // all lookups and modifications to the list must be synchronized on the list.
    private final List<Runnable> tasks = new LinkedList<Runnable>();
    // atomic boolean used to ensure only 1 task is executed at any given time
    private final AtomicBoolean executeNextTask = new AtomicBoolean(true);
    private final Executor executor;

    public SerialTaskQueue(Executor executor) {
        this.executor = executor;
    }

    public void add(Runnable task) {

        synchronized(tasks) { tasks.add(task); }
        runNextTask();
    }

    private void runNextTask() {
        // critical section that ensures one task is executed.
        synchronized(tasks) {
            if (!tasks.isEmpty()
                    && executeNextTask.compareAndSet(true, false)) {
                executor.execute(wrapTask(tasks.remove(0)));
            }
        }
    }

    private CallbackTask wrapTask(Runnable task) {

        return new CallbackTask(task, new Runnable() {
            @Override public void run() {
                if (!executeNextTask.compareAndSet(false, true)) {
                    System.out.println("ERROR: programming error, the callback should always run in execute state.");
                }
                runNextTask();
            }
        });
    }

    public int size() {
        synchronized(tasks) { return tasks.size(); }
    }

    public Runnable get(int index) {
        synchronized(tasks) { return tasks.get(index); }
    }

    public Runnable remove(int index) {
        synchronized(tasks) { return tasks.remove(index); }
    }

    // general callback-task, see 
    static class CallbackTask implements Runnable {

        private final Runnable task, callback;

        public CallbackTask(Runnable task, Runnable callback) {
            this.task = task;
            this.callback = callback;
        }

        @Override public void run() {
            try {
                task.run();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    callback.run();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    // task that just sleeps for a while
    static class SleepSome implements Runnable {

        static long startTime = System.currentTimeMillis();

        private final long sleepTimeMs;
        public SleepSome(long sleepTimeMs) {
            this.sleepTimeMs = sleepTimeMs;
        }
        @Override public void run() {
            try { 
                System.out.println(tdelta() + "Sleeping for " + sleepTimeMs + " ms.");
                Thread.sleep(sleepTimeMs);
                System.out.println(tdelta() + "Slept for " + sleepTimeMs + " ms.");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        private String tdelta() { return String.format("% 4d ", (System.currentTimeMillis() - startTime)); }
    }
}

更新:如果任务组需要串行执行,请查看改编后的实现