Java 等待通知死锁问题
Java wait notify deadlock issue
这是我的 class,它连续执行相关的 Runnable
。它所做的是所有 Runnable
并行执行,但在完成时等待队列中存在的 head
Runnable
首先完成。一旦 head 完成,第二项完成,依此类推。
此代码的问题在于它会导致某种死锁。当执行很多任务时,它停止执行。暂停调试器时,显示所有线程都在等待 wait()
语句。
/**
* Executes all tasks in parallel, with completion handler called only when other tasks of same key are complete.
* For a given key, the order in which {@link #execute(Object, int, java.util.concurrent.Callable, Runnable, Runnable)} was called will be the order in which completion runnable will be called.
*/
public class DependentExecutor {
private final Executor executor;
private final Map<Object, Queue<DependentTask>> allTasks = new ArrayMap<>();
private final boolean enableDependency;
public DependentExecutor(boolean enableDependency, Executor executor) {
this.executor = executor;
this.enableDependency = enableDependency;
}
/**
* You should return true from the task on successful completion.
* If task returns false, then completion runnable wont be executed.
* <p/>
* This method will return false if tha task with this uniqueId already exists. Otherwise true is returned.
*
* @param key A non null key using which task dependency is decided. Tasks with same key are dependent.
* @param uniqueId If there is a task with this uniqueId already present, this task will be rejected
* @param task Optional. A long pending task to be performed or null if only completion is to be dependant.
* @param completionCallback A non null callback which will be serially executed for tasks with same key
* @param errorCallback If task returns false, then this callback will be invoked immediately (no dependency)
*/
public boolean execute(Object key, int uniqueId, Callable<Boolean> task, Runnable completionCallback, Runnable errorCallback) {
DependentTask queuedTask;
synchronized (allTasks) {
Queue<DependentTask> queue = allTasks.get(key);
for (Map.Entry<Object, Queue<DependentTask>> objectQueueEntry : allTasks.entrySet()) {
synchronized (objectQueueEntry.getValue()) {
Iterator<DependentTask> iterator = objectQueueEntry.getValue().iterator();
while (iterator.hasNext()) {
DependentTask dependentTask = iterator.next();
if (dependentTask.getUniqueId() == uniqueId) {
// no 2 tasks can have same uniqueID
return false;
}
}
}
}
if (queue == null && task == null) {
// this means we have no pending dependency as well as no task to perform. So only callback.
completionCallback.run();
return true;
} else if (queue == null) {
queue = new LinkedList<DependentTask>();
allTasks.put(key, queue);
}
if (!enableDependency) {
key = Math.random();
}
queuedTask = new DependentTask(key, uniqueId, queue, task, completionCallback, errorCallback);
queue.add(queuedTask);
}
executor.execute(queuedTask);
return true;
}
class DependentTask implements Runnable {
private final Queue<DependentTask> dependencyQueue;
private final Callable<Boolean> task;
private final Object key;
private final Runnable completionCallback;
private final Runnable errorCallback;
private final int uniqueId;
public DependentTask(Object key, int uniqueId, Queue<DependentTask> dependencyQueue, Callable<Boolean> task, Runnable completionCallback, Runnable errorCallback) {
this.uniqueId = uniqueId;
this.task = task;
this.dependencyQueue = dependencyQueue;
this.key = key;
this.completionCallback = completionCallback;
this.errorCallback = errorCallback;
}
public int getUniqueId() {
return uniqueId;
}
@Override
public void run() {
Boolean result = false;
try {
if (task != null) {
result = task.call();
} else {
result = true;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (result) {
synchronized (dependencyQueue) {
while (dependencyQueue.peek() != this) {
try {
dependencyQueue.wait(); // deadlock !!
} catch (InterruptedException e) {
}
}
}
completionCallback.run(); // by now we are the first element in the linked list. Lets call completion.
} else {
errorCallback.run(); // by now we are the first element in the linked list. Lets call error callback.
}
synchronized (dependencyQueue) {
dependencyQueue.remove(); //remove thyself
dependencyQueue.notifyAll();
}
// clean up of main map
synchronized (allTasks) {
if (dependencyQueue.isEmpty()) {
allTasks.remove(key);
}
}
}
}
}
}
问题#1
您从队列中删除 "self" 的逻辑有误。您无条件地从队列中移除,并且总是从顶部移除(即任务实际上并没有从队列中移除自己,它总是移除顶部),但是检查顶部是否实际上是任务的一部分是有条件的 - 并且仅在实施任务返回 true
.
时运行
因此,任何时候执行任务 returns false
,或因异常而失败,任务将从队列顶部删除一些东西,很有可能它不是自己.因此,被移除的任务仍然是运行,永远不会找到自己在顶部,并且会无休止地等待。
问题#2
您正在同步之外修改 dependencyQueue。您的队列实现是 LinkedList,它不是线程安全的。你应该使用:
synchronized (queue) {
queue.add(queuedTask);
}
当您将新任务添加到队列时。
最有可能发生的事情是 add()
与 remove()
同时调用,这会破坏列表的内部状态。 add()
实际上失败了(列表不包含添加的元素),因此相应的线程永远不会在列表中找到它自己。如果您可以轻松地重现它,您可以通过连接调试器并评估队列中的值来测试它 - 您会看到 "hung" 线程甚至不存在。
这是我的 class,它连续执行相关的 Runnable
。它所做的是所有 Runnable
并行执行,但在完成时等待队列中存在的 head
Runnable
首先完成。一旦 head 完成,第二项完成,依此类推。
此代码的问题在于它会导致某种死锁。当执行很多任务时,它停止执行。暂停调试器时,显示所有线程都在等待 wait()
语句。
/**
* Executes all tasks in parallel, with completion handler called only when other tasks of same key are complete.
* For a given key, the order in which {@link #execute(Object, int, java.util.concurrent.Callable, Runnable, Runnable)} was called will be the order in which completion runnable will be called.
*/
public class DependentExecutor {
private final Executor executor;
private final Map<Object, Queue<DependentTask>> allTasks = new ArrayMap<>();
private final boolean enableDependency;
public DependentExecutor(boolean enableDependency, Executor executor) {
this.executor = executor;
this.enableDependency = enableDependency;
}
/**
* You should return true from the task on successful completion.
* If task returns false, then completion runnable wont be executed.
* <p/>
* This method will return false if tha task with this uniqueId already exists. Otherwise true is returned.
*
* @param key A non null key using which task dependency is decided. Tasks with same key are dependent.
* @param uniqueId If there is a task with this uniqueId already present, this task will be rejected
* @param task Optional. A long pending task to be performed or null if only completion is to be dependant.
* @param completionCallback A non null callback which will be serially executed for tasks with same key
* @param errorCallback If task returns false, then this callback will be invoked immediately (no dependency)
*/
public boolean execute(Object key, int uniqueId, Callable<Boolean> task, Runnable completionCallback, Runnable errorCallback) {
DependentTask queuedTask;
synchronized (allTasks) {
Queue<DependentTask> queue = allTasks.get(key);
for (Map.Entry<Object, Queue<DependentTask>> objectQueueEntry : allTasks.entrySet()) {
synchronized (objectQueueEntry.getValue()) {
Iterator<DependentTask> iterator = objectQueueEntry.getValue().iterator();
while (iterator.hasNext()) {
DependentTask dependentTask = iterator.next();
if (dependentTask.getUniqueId() == uniqueId) {
// no 2 tasks can have same uniqueID
return false;
}
}
}
}
if (queue == null && task == null) {
// this means we have no pending dependency as well as no task to perform. So only callback.
completionCallback.run();
return true;
} else if (queue == null) {
queue = new LinkedList<DependentTask>();
allTasks.put(key, queue);
}
if (!enableDependency) {
key = Math.random();
}
queuedTask = new DependentTask(key, uniqueId, queue, task, completionCallback, errorCallback);
queue.add(queuedTask);
}
executor.execute(queuedTask);
return true;
}
class DependentTask implements Runnable {
private final Queue<DependentTask> dependencyQueue;
private final Callable<Boolean> task;
private final Object key;
private final Runnable completionCallback;
private final Runnable errorCallback;
private final int uniqueId;
public DependentTask(Object key, int uniqueId, Queue<DependentTask> dependencyQueue, Callable<Boolean> task, Runnable completionCallback, Runnable errorCallback) {
this.uniqueId = uniqueId;
this.task = task;
this.dependencyQueue = dependencyQueue;
this.key = key;
this.completionCallback = completionCallback;
this.errorCallback = errorCallback;
}
public int getUniqueId() {
return uniqueId;
}
@Override
public void run() {
Boolean result = false;
try {
if (task != null) {
result = task.call();
} else {
result = true;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (result) {
synchronized (dependencyQueue) {
while (dependencyQueue.peek() != this) {
try {
dependencyQueue.wait(); // deadlock !!
} catch (InterruptedException e) {
}
}
}
completionCallback.run(); // by now we are the first element in the linked list. Lets call completion.
} else {
errorCallback.run(); // by now we are the first element in the linked list. Lets call error callback.
}
synchronized (dependencyQueue) {
dependencyQueue.remove(); //remove thyself
dependencyQueue.notifyAll();
}
// clean up of main map
synchronized (allTasks) {
if (dependencyQueue.isEmpty()) {
allTasks.remove(key);
}
}
}
}
}
}
问题#1
您从队列中删除 "self" 的逻辑有误。您无条件地从队列中移除,并且总是从顶部移除(即任务实际上并没有从队列中移除自己,它总是移除顶部),但是检查顶部是否实际上是任务的一部分是有条件的 - 并且仅在实施任务返回 true
.
因此,任何时候执行任务 returns false
,或因异常而失败,任务将从队列顶部删除一些东西,很有可能它不是自己.因此,被移除的任务仍然是运行,永远不会找到自己在顶部,并且会无休止地等待。
问题#2
您正在同步之外修改 dependencyQueue。您的队列实现是 LinkedList,它不是线程安全的。你应该使用:
synchronized (queue) {
queue.add(queuedTask);
}
当您将新任务添加到队列时。
最有可能发生的事情是 add()
与 remove()
同时调用,这会破坏列表的内部状态。 add()
实际上失败了(列表不包含添加的元素),因此相应的线程永远不会在列表中找到它自己。如果您可以轻松地重现它,您可以通过连接调试器并评估队列中的值来测试它 - 您会看到 "hung" 线程甚至不存在。