非阻塞延迟队列,Java

Nonblocking DelayQueue, Java

我正在使用 Java 的 DelayQueue 在第二次延迟后调度事件。但问题是,在重负载下,我在 DelayQueue 上的消费者会阻塞,直到来自另一个线程的大量 offer() 操作消失。

有人知道Java中的非阻塞延迟队列实现吗?

我认为您误解了 DelayQueue API 或线程调度的工作方式。

如果你想要真正的非阻塞队列操作,那么 DelayQueue 已经提供了。例如 poll() 将立即 return 队列条目或 null。它不会阻塞调用线程。 offer(...) 方法是无阻塞插入的等效方法。

另一方面,如果您实际上是在说某些线程正在 "starved",那么您对此无能为力。 Java线程调度不是"fair":

  • 如果您有许多 运行 可用的线程,则不会尝试为每个线程提供相似的 运行 时间。

  • 如果有多个线程在等待原始锁或通知,那么调度程序将不会尝试选择一个线程"fairly".

如果您的线程数量多于 运行 核心数量,线程饥饿的可能性就更大。

最好的解决方案是设计您的算法,即使线程调度不公平也无关紧要。没关系;参见 Is a DelayQueue without fairness problematic?


郑重声明,我不知道 DelayQueue 宣传公平安排的替代品。

不幸的是,DelayQueue 正在阻塞队列并且它不会return 立即写入,因为它使用了锁。

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            return q.poll();
    } finally {
        lock.unlock();
    }
}

所以如果有很多线程写入它,正如 Stephen 所说,您对此无能为力。

我通过使用 ConcurrentSkipListSet 和 DelayedElement 解决了这个问题。

public class DelayedElement implements Comparable<DelayedElement> {

private final Long initTime;
private final String msgId;

public DelayedElement(Long initTime, String msgId) {
    this.initTime = initTime;
    this.msgId = msgId;
}             

@Override
public int hashCode() {
    int hash = 5;
    hash = 29 * hash + Objects.hashCode(this.initTime);
    hash = 29 * hash + Objects.hashCode(this.msgId);
    return hash;
}

@Override
public boolean equals(Object obj) {
    if (obj == null) {
        return false;
    }
    if (getClass() != obj.getClass()) {
        return false;
    }
    final DelayedElement other = (DelayedElement) obj;
    if (!Objects.equals(this.initTime, other.initTime)) {
        return false;
    }
    if (!Objects.equals(this.msgId, other.msgId)) {
        return false;
    }
    return true;
}
@Override
public int compareTo(DelayedElement o) {
    return -o.initTime.compareTo(initTime);
    }
}

在我的制作人线程中,我添加每个元素都有一秒的延迟。 在我的消费者线程中,我只是读取具有一秒延迟的元素,例如:

 long diff = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(1000L);
 NavigableSet<DelayedElement> set = queue.headSet(
 new DelayedElement(diff, "", null));
 //further processing goes here     

通过这种方式,我实现了非阻塞特性,可以安全地全速写入和读取集合。