后台线程中的可编辑任务队列 运行
Editable queue of tasks running in background thread
我知道这个问题已经回答了很多次,但我很难理解它是如何工作的。
所以在我的应用程序中,用户必须能够 select 项,这些项将被添加到队列中(使用 ObservableList<Task>
在 ListView
中显示)并且每个项都需要由 ExecutorService
顺序处理。
此外,该队列应该是可编辑的(更改顺序并从列表中删除项目)。
private void handleItemClicked(MouseEvent event) {
if (event.getClickCount() == 2) {
File item = listView.getSelectionModel().getSelectedItem();
Task<Void> task = createTask(item);
facade.getTaskQueueList().add(task); // this list is bound to a ListView, where it can be edited
Future result = executor.submit(task);
// where executor is an ExecutorService of which type?
try {
result.get();
} catch (Exception e) {
// ...
}
}
}
用 executor = Executors.newFixedThreadPool(1)
尝试过,但我无法控制队列。
我阅读了 ThreadPoolExecutor
和队列,但我很难理解它,因为我对并发还很陌生。
我需要在后台线程中运行那个方法handleItemClicked
,这样UI就不会冻结,最好的方法是什么?
总结:如何实现任务队列,由后台线程编辑和顺序处理?
请帮我想想办法
编辑
使用 vanOekel 的 SerialTaskQueue
class 帮助了我,现在我想将任务列表绑定到我的 ListView
.
ListProperty<Runnable> listProperty = new SimpleListProperty<>();
listProperty.set(taskQueue.getTaskList()); // getTaskList() returns the LinkedList from SerialTaskQueue
queueListView.itemsProperty().bind(listProperty);
显然这不起作用,因为它需要一个 ObservableList。有什么优雅的方法吗?
我能想到的最简单的解决方案是在执行程序外部维护任务列表,并使用回调向执行程序提供下一个任务(如果可用)。不幸的是,它涉及任务列表上的同步和一个 AtomicBoolean
来指示正在执行的任务。
回调只是一个 Runnable
将原始任务包装到 运行 然后 "calls back" 以查看是否有另一个任务要执行,如果有,则使用(后台)执行者。
需要同步才能使任务列表保持有序并处于已知状态。任务列表可以同时被两个线程修改:通过执行者(后台)线程中的回调 运行ning 和通过 UI 前台线程执行的 handleItemClicked
方法。这反过来意味着永远无法准确知道任务列表何时为空。为了使任务列表保持有序并处于已知的固定状态,需要同步任务列表。
这仍然留下了一个模棱两可的时刻来决定任务何时准备好执行。这就是 AtomicBoolean
的用武之地:一个值集总是立即可用并被任何其他线程读取,而 compareAndSet
方法将始终确保只有一个线程获得 "OK".
结合同步和 AtomicBoolean
的使用允许创建一个带有 "critical section" 的方法,可以同时被前台和后台线程调用以触发如果可能,执行新任务。以下代码的设计和设置方式使得可以存在一种这样的方法 (runNextTask
)。最好使并发代码中的 "critical section" 尽可能简单明了(这反过来通常会导致高效的 "critical section")。
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
public class SerialTaskQueue {
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
// all operations on this list must be synchronized on the list itself.
SerialTaskQueue tq = new SerialTaskQueue(executor);
try {
// test running the tasks one by one
tq.add(new SleepSome(10L));
Thread.sleep(5L);
tq.add(new SleepSome(20L));
tq.add(new SleepSome(30L));
Thread.sleep(100L);
System.out.println("Queue size: " + tq.size()); // should be empty
tq.add(new SleepSome(10L));
Thread.sleep(100L);
} catch (Exception e) {
e.printStackTrace();
} finally {
executor.shutdownNow();
}
}
// all lookups and modifications to the list must be synchronized on the list.
private final List<Runnable> tasks = new LinkedList<Runnable>();
// atomic boolean used to ensure only 1 task is executed at any given time
private final AtomicBoolean executeNextTask = new AtomicBoolean(true);
private final Executor executor;
public SerialTaskQueue(Executor executor) {
this.executor = executor;
}
public void add(Runnable task) {
synchronized(tasks) { tasks.add(task); }
runNextTask();
}
private void runNextTask() {
// critical section that ensures one task is executed.
synchronized(tasks) {
if (!tasks.isEmpty()
&& executeNextTask.compareAndSet(true, false)) {
executor.execute(wrapTask(tasks.remove(0)));
}
}
}
private CallbackTask wrapTask(Runnable task) {
return new CallbackTask(task, new Runnable() {
@Override public void run() {
if (!executeNextTask.compareAndSet(false, true)) {
System.out.println("ERROR: programming error, the callback should always run in execute state.");
}
runNextTask();
}
});
}
public int size() {
synchronized(tasks) { return tasks.size(); }
}
public Runnable get(int index) {
synchronized(tasks) { return tasks.get(index); }
}
public Runnable remove(int index) {
synchronized(tasks) { return tasks.remove(index); }
}
// general callback-task, see
static class CallbackTask implements Runnable {
private final Runnable task, callback;
public CallbackTask(Runnable task, Runnable callback) {
this.task = task;
this.callback = callback;
}
@Override public void run() {
try {
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
callback.run();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
// task that just sleeps for a while
static class SleepSome implements Runnable {
static long startTime = System.currentTimeMillis();
private final long sleepTimeMs;
public SleepSome(long sleepTimeMs) {
this.sleepTimeMs = sleepTimeMs;
}
@Override public void run() {
try {
System.out.println(tdelta() + "Sleeping for " + sleepTimeMs + " ms.");
Thread.sleep(sleepTimeMs);
System.out.println(tdelta() + "Slept for " + sleepTimeMs + " ms.");
} catch (Exception e) {
e.printStackTrace();
}
}
private String tdelta() { return String.format("% 4d ", (System.currentTimeMillis() - startTime)); }
}
}
更新:如果任务组需要串行执行,请查看改编后的实现 。
我知道这个问题已经回答了很多次,但我很难理解它是如何工作的。
所以在我的应用程序中,用户必须能够 select 项,这些项将被添加到队列中(使用 ObservableList<Task>
在 ListView
中显示)并且每个项都需要由 ExecutorService
顺序处理。
此外,该队列应该是可编辑的(更改顺序并从列表中删除项目)。
private void handleItemClicked(MouseEvent event) {
if (event.getClickCount() == 2) {
File item = listView.getSelectionModel().getSelectedItem();
Task<Void> task = createTask(item);
facade.getTaskQueueList().add(task); // this list is bound to a ListView, where it can be edited
Future result = executor.submit(task);
// where executor is an ExecutorService of which type?
try {
result.get();
} catch (Exception e) {
// ...
}
}
}
用 executor = Executors.newFixedThreadPool(1)
尝试过,但我无法控制队列。
我阅读了 ThreadPoolExecutor
和队列,但我很难理解它,因为我对并发还很陌生。
我需要在后台线程中运行那个方法handleItemClicked
,这样UI就不会冻结,最好的方法是什么?
总结:如何实现任务队列,由后台线程编辑和顺序处理?
请帮我想想办法
编辑
使用 vanOekel 的 SerialTaskQueue
class 帮助了我,现在我想将任务列表绑定到我的 ListView
.
ListProperty<Runnable> listProperty = new SimpleListProperty<>();
listProperty.set(taskQueue.getTaskList()); // getTaskList() returns the LinkedList from SerialTaskQueue
queueListView.itemsProperty().bind(listProperty);
显然这不起作用,因为它需要一个 ObservableList。有什么优雅的方法吗?
我能想到的最简单的解决方案是在执行程序外部维护任务列表,并使用回调向执行程序提供下一个任务(如果可用)。不幸的是,它涉及任务列表上的同步和一个 AtomicBoolean
来指示正在执行的任务。
回调只是一个 Runnable
将原始任务包装到 运行 然后 "calls back" 以查看是否有另一个任务要执行,如果有,则使用(后台)执行者。
需要同步才能使任务列表保持有序并处于已知状态。任务列表可以同时被两个线程修改:通过执行者(后台)线程中的回调 运行ning 和通过 UI 前台线程执行的 handleItemClicked
方法。这反过来意味着永远无法准确知道任务列表何时为空。为了使任务列表保持有序并处于已知的固定状态,需要同步任务列表。
这仍然留下了一个模棱两可的时刻来决定任务何时准备好执行。这就是 AtomicBoolean
的用武之地:一个值集总是立即可用并被任何其他线程读取,而 compareAndSet
方法将始终确保只有一个线程获得 "OK".
结合同步和 AtomicBoolean
的使用允许创建一个带有 "critical section" 的方法,可以同时被前台和后台线程调用以触发如果可能,执行新任务。以下代码的设计和设置方式使得可以存在一种这样的方法 (runNextTask
)。最好使并发代码中的 "critical section" 尽可能简单明了(这反过来通常会导致高效的 "critical section")。
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
public class SerialTaskQueue {
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
// all operations on this list must be synchronized on the list itself.
SerialTaskQueue tq = new SerialTaskQueue(executor);
try {
// test running the tasks one by one
tq.add(new SleepSome(10L));
Thread.sleep(5L);
tq.add(new SleepSome(20L));
tq.add(new SleepSome(30L));
Thread.sleep(100L);
System.out.println("Queue size: " + tq.size()); // should be empty
tq.add(new SleepSome(10L));
Thread.sleep(100L);
} catch (Exception e) {
e.printStackTrace();
} finally {
executor.shutdownNow();
}
}
// all lookups and modifications to the list must be synchronized on the list.
private final List<Runnable> tasks = new LinkedList<Runnable>();
// atomic boolean used to ensure only 1 task is executed at any given time
private final AtomicBoolean executeNextTask = new AtomicBoolean(true);
private final Executor executor;
public SerialTaskQueue(Executor executor) {
this.executor = executor;
}
public void add(Runnable task) {
synchronized(tasks) { tasks.add(task); }
runNextTask();
}
private void runNextTask() {
// critical section that ensures one task is executed.
synchronized(tasks) {
if (!tasks.isEmpty()
&& executeNextTask.compareAndSet(true, false)) {
executor.execute(wrapTask(tasks.remove(0)));
}
}
}
private CallbackTask wrapTask(Runnable task) {
return new CallbackTask(task, new Runnable() {
@Override public void run() {
if (!executeNextTask.compareAndSet(false, true)) {
System.out.println("ERROR: programming error, the callback should always run in execute state.");
}
runNextTask();
}
});
}
public int size() {
synchronized(tasks) { return tasks.size(); }
}
public Runnable get(int index) {
synchronized(tasks) { return tasks.get(index); }
}
public Runnable remove(int index) {
synchronized(tasks) { return tasks.remove(index); }
}
// general callback-task, see
static class CallbackTask implements Runnable {
private final Runnable task, callback;
public CallbackTask(Runnable task, Runnable callback) {
this.task = task;
this.callback = callback;
}
@Override public void run() {
try {
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
callback.run();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
// task that just sleeps for a while
static class SleepSome implements Runnable {
static long startTime = System.currentTimeMillis();
private final long sleepTimeMs;
public SleepSome(long sleepTimeMs) {
this.sleepTimeMs = sleepTimeMs;
}
@Override public void run() {
try {
System.out.println(tdelta() + "Sleeping for " + sleepTimeMs + " ms.");
Thread.sleep(sleepTimeMs);
System.out.println(tdelta() + "Slept for " + sleepTimeMs + " ms.");
} catch (Exception e) {
e.printStackTrace();
}
}
private String tdelta() { return String.format("% 4d ", (System.currentTimeMillis() - startTime)); }
}
}
更新:如果任务组需要串行执行,请查看改编后的实现