如何设置 Java DelayQueue 的容量

How to set capacity for Java DelayQueue

我使用 LinkedBlockingQueue 队列为 TCP/IP 事件传输实现 producer-consumer pattern,我使用 boolean offer(e) 这意味着一旦队列达到其容量,新的传入的事件将被忽略(丢弃)并且它 returns false.

现在我必须将事件保持一段可配置的时间(比如 2 秒),所以我决定使用 DelayQueue 它可以保持元素并仅在时间到期时释放元素。

不幸的是,DelayQueue 无界的 。我想知道是否有办法为 DelayQueuedelayQueue.size() == CAPACITY 设置容量总是可靠的?

这是一个相当大的问题,因为我们需要访问 DelayQueue 使用的内部锁,如果我们要对其进行子类化或委托给它,并且该锁在 DelayQueue 中是私有的。

我们不能使用第二个锁,因为这会导致 take 出现问题。您也许可以自己实现它,但是这样做您已经完成了自己的 DelayQueue 实现的一半以上,所以这可能不是您想要的。

我们可以使用反射访问锁。但是请注意,这不是最好的想法,因为它依赖于 DelayQueue 的实现细节。它可能无法在所有 JRE 上运行,如果您更改 运行 所在的 JRE 版本,它甚至可能会中断。也就是说,我认为这是解决您的问题的绝对最简单的方法,尽管有点脏。

/**
 * Bounded implementation of {@link DelayQueue}. This implementation uses
 * reflection to access the internal lock in {@link DelayQueue} so might
 * only work on the Oracle 1.8 JRE.
 * @param <T>
 */
public class BoundedDelayQueue<T extends Delayed> extends DelayQueue<T> {

    // Lock used to synchronize every operation
    private final transient ReentrantLock lock;

    // The limit
    private final int limit;

    BoundedDelayQueue(int limit) {
        try {
            // Grab the private lock in DelayQueue using reflection so we can use it
            // to gain exclusive access outside of DelayQueue
            Field lockField = DelayQueue.class.getDeclaredField("lock");
            lockField.setAccessible(true);
            this.lock = (ReentrantLock) lockField.get(this);
        } catch (NoSuchFieldException | IllegalAccessException e) {
            throw new Error("Could not access lock field", e);
        }
        this.limit = limit;
    }

    @Override
    // All the various ways of adding items in DelayQueue delegate to
    // offer, so we only have to override it and not the other methods
    public boolean offer(final T t) {
        // Lock the lock
        lock.lock();
        try {
            // Check the size limit
            if(size() == limit) {
                return false;
            }
            // Forward to superclass
            return super.offer(t);
        } finally {
            lock.unlock();
        }
    }
}

请注意,这不会实现 offer 超时,如果您需要,则必须自己完成。