非阻塞延迟队列,Java
Nonblocking DelayQueue, Java
我正在使用 Java 的 DelayQueue 在第二次延迟后调度事件。但问题是,在重负载下,我在 DelayQueue 上的消费者会阻塞,直到来自另一个线程的大量 offer() 操作消失。
有人知道Java中的非阻塞延迟队列实现吗?
我认为您误解了 DelayQueue
API 或线程调度的工作方式。
如果你想要真正的非阻塞队列操作,那么 DelayQueue
已经提供了。例如 poll()
将立即 return 队列条目或 null
。它不会阻塞调用线程。 offer(...)
方法是无阻塞插入的等效方法。
另一方面,如果您实际上是在说某些线程正在 "starved",那么您对此无能为力。 Java线程调度不是"fair":
如果您有许多 运行 可用的线程,则不会尝试为每个线程提供相似的 运行 时间。
如果有多个线程在等待原始锁或通知,那么调度程序将不会尝试选择一个线程"fairly".
如果您的线程数量多于 运行 核心数量,线程饥饿的可能性就更大。
最好的解决方案是设计您的算法,即使线程调度不公平也无关紧要。没关系;参见 Is a DelayQueue without fairness problematic?。
郑重声明,我不知道 DelayQueue
宣传公平安排的替代品。
不幸的是,DelayQueue 正在阻塞队列并且它不会return 立即写入,因为它使用了锁。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}
所以如果有很多线程写入它,正如 Stephen 所说,您对此无能为力。
我通过使用 ConcurrentSkipListSet 和 DelayedElement 解决了这个问题。
public class DelayedElement implements Comparable<DelayedElement> {
private final Long initTime;
private final String msgId;
public DelayedElement(Long initTime, String msgId) {
this.initTime = initTime;
this.msgId = msgId;
}
@Override
public int hashCode() {
int hash = 5;
hash = 29 * hash + Objects.hashCode(this.initTime);
hash = 29 * hash + Objects.hashCode(this.msgId);
return hash;
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final DelayedElement other = (DelayedElement) obj;
if (!Objects.equals(this.initTime, other.initTime)) {
return false;
}
if (!Objects.equals(this.msgId, other.msgId)) {
return false;
}
return true;
}
@Override
public int compareTo(DelayedElement o) {
return -o.initTime.compareTo(initTime);
}
}
在我的制作人线程中,我添加每个元素都有一秒的延迟。
在我的消费者线程中,我只是读取具有一秒延迟的元素,例如:
long diff = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(1000L);
NavigableSet<DelayedElement> set = queue.headSet(
new DelayedElement(diff, "", null));
//further processing goes here
通过这种方式,我实现了非阻塞特性,可以安全地全速写入和读取集合。
我正在使用 Java 的 DelayQueue 在第二次延迟后调度事件。但问题是,在重负载下,我在 DelayQueue 上的消费者会阻塞,直到来自另一个线程的大量 offer() 操作消失。
有人知道Java中的非阻塞延迟队列实现吗?
我认为您误解了 DelayQueue
API 或线程调度的工作方式。
如果你想要真正的非阻塞队列操作,那么 DelayQueue
已经提供了。例如 poll()
将立即 return 队列条目或 null
。它不会阻塞调用线程。 offer(...)
方法是无阻塞插入的等效方法。
另一方面,如果您实际上是在说某些线程正在 "starved",那么您对此无能为力。 Java线程调度不是"fair":
如果您有许多 运行 可用的线程,则不会尝试为每个线程提供相似的 运行 时间。
如果有多个线程在等待原始锁或通知,那么调度程序将不会尝试选择一个线程"fairly".
如果您的线程数量多于 运行 核心数量,线程饥饿的可能性就更大。
最好的解决方案是设计您的算法,即使线程调度不公平也无关紧要。没关系;参见 Is a DelayQueue without fairness problematic?。
郑重声明,我不知道 DelayQueue
宣传公平安排的替代品。
不幸的是,DelayQueue 正在阻塞队列并且它不会return 立即写入,因为它使用了锁。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}
所以如果有很多线程写入它,正如 Stephen 所说,您对此无能为力。
我通过使用 ConcurrentSkipListSet 和 DelayedElement 解决了这个问题。
public class DelayedElement implements Comparable<DelayedElement> {
private final Long initTime;
private final String msgId;
public DelayedElement(Long initTime, String msgId) {
this.initTime = initTime;
this.msgId = msgId;
}
@Override
public int hashCode() {
int hash = 5;
hash = 29 * hash + Objects.hashCode(this.initTime);
hash = 29 * hash + Objects.hashCode(this.msgId);
return hash;
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final DelayedElement other = (DelayedElement) obj;
if (!Objects.equals(this.initTime, other.initTime)) {
return false;
}
if (!Objects.equals(this.msgId, other.msgId)) {
return false;
}
return true;
}
@Override
public int compareTo(DelayedElement o) {
return -o.initTime.compareTo(initTime);
}
}
在我的制作人线程中,我添加每个元素都有一秒的延迟。 在我的消费者线程中,我只是读取具有一秒延迟的元素,例如:
long diff = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(1000L);
NavigableSet<DelayedElement> set = queue.headSet(
new DelayedElement(diff, "", null));
//further processing goes here
通过这种方式,我实现了非阻塞特性,可以安全地全速写入和读取集合。