非阻塞限速 ThreadPoolExecutor

Non-blocking rate-limited ThreadPoolExecutor

我正在通过多个连接同时访问 HTTP 服务器。我想限制客户端以响应表明请求进入速度太快的服务器。我不想更改我正在使用的 HTTP 库,而是想扩展它。

为此,我如何实现具有以下约束的 ThreadPoolExecutor



  • 不可能有完全 non-blocking 的解决方案。即使 ScheduledThreadPoolExecutor 也至少保留一个线程等待队列 return 一项新任务。
  • ThreadPoolExecutor 位于 BlockingQueue 之上。当没有剩余任务时,它会阻塞 BlockingQueue.take()
  • 解决方案有 3 个移动部分:
  1. 速率限制器。
  2. 一个 BlockingQueue 隐藏元素,直到速率限制器允许使用它们。
  3. A ThreadPoolExecutor 位于 BlockingQueue.
  4. 之上


我根据 Token Bucket algorithm algorithm to overcome RateLimiter's limitations. The source-code can be found here.



我实施了一个 BlockingDeque(它扩展了 BlockingQueue),因为将来我想尝试将失败的任务推回到队列的前面。


import java.time.Duration;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import static org.bitbucket.cowwoc.requirements.core.Requirements.requireThat;

 * A blocking deque of elements, in which an element can only be taken when the deque-wide delay has expired.
 * <p>
 * The optional capacity bound constructor argument serves as a way to prevent excessive expansion. The capacity, if
 * unspecified, is equal to {@link Integer#MAX_VALUE}.
 * <p>
 * Even though methods that take elements, such as {@code take} or {@code poll}, respect the deque-wide delay the
 * remaining methods treat them as normal elements. For example, the {@code size} method returns the count of both
 * expired and unexpired elements.
 * <p>
 * This class and its iterator implement all of the <em>optional</em> methods of the {@link Collection} and {@link
 * Iterator} interfaces.
 * @param <E> the type of elements in the deque
 * @author Gili Tzabari
public final class RateLimitedBlockingDeque<E> implements BlockingDeque<E>
    private final int capacity;
    private final LinkedBlockingDeque<E> delegate;
    private final Bucket rateLimit = new Bucket();

     * Creates a {@code RateLimitedBlockingDeque} with a capacity of {@link Integer#MAX_VALUE}.
    public RateLimitedBlockingDeque()
        this.capacity = Integer.MAX_VALUE;
        this.delegate = new LinkedBlockingDeque<>();

     * Creates a {@code RateLimitedBlockingDeque} with the given (fixed) capacity.
     * @param capacity the capacity of this deque
     * @throws IllegalArgumentException if {@code capacity} is less than 1
    public RateLimitedBlockingDeque(int capacity)
        this.capacity = capacity;
        this.delegate = new LinkedBlockingDeque<>(capacity);

     * @return the capacity of the deque
    public int getCapacity()
        return capacity;

     * Indicates the rate at which elements may be taken from the queue.
     * @param elements the number of elements that may be taken per {@code period}
     * @param period   indicates how often elements may be taken
     * @throws NullPointerException     if {@code period} is null
     * @throws IllegalArgumentException if the requested rate is greater than element per nanosecond
    public void setRate(long elements, Duration period)
        synchronized (rateLimit)
            Limit newLimit = new Limit(elements, period, 0, Long.MAX_VALUE);
            if (rateLimit.getLimits().isEmpty())
                Limit oldLimit = rateLimit.getLimits().iterator().next();
                rateLimit.replaceLimit(oldLimit, newLimit);

     * Allows consumption of elements without limit.
    public void removeRate()
        synchronized (rateLimit)

    public void addFirst(E e)

    public void addLast(E e)

    public boolean offerFirst(E e)
        return delegate.offerFirst(e);

    public boolean offerLast(E e)
        return delegate.offerLast(e);

    public void putFirst(E e) throws InterruptedException

    public void putLast(E e) throws InterruptedException

    public boolean offerFirst(E e, long timeout, TimeUnit unit) throws InterruptedException
        return delegate.offerFirst(e, timeout, unit);

    public boolean offerLast(E e, long timeout, TimeUnit unit) throws InterruptedException
        return delegate.offerLast(e, timeout, unit);

    public E removeFirst()
        if (rateLimit.tryConsume())
            return delegate.removeFirst();
        throw new NoSuchElementException();

    public E removeLast()
        if (rateLimit.tryConsume())
            return delegate.removeLast();
        throw new NoSuchElementException();

    public E pollFirst()
        if (rateLimit.tryConsume())
            return delegate.pollFirst();
        return null;

    public E pollLast()
        if (rateLimit.tryConsume())
            return delegate.pollLast();
        return null;

    public E takeFirst() throws InterruptedException
        return delegate.takeFirst();

    public E takeLast() throws InterruptedException
        return delegate.takeLast();

    public E pollFirst(long timeout, TimeUnit unit) throws InterruptedException
        if (rateLimit.consume(1, timeout, unit))
            return delegate.pollFirst(timeout, unit);
        return null;

    public E pollLast(long timeout, TimeUnit unit) throws InterruptedException
        if (rateLimit.consume(1, timeout, unit))
            return delegate.pollLast(timeout, unit);
        return null;

    public E getFirst()
        return delegate.getFirst();

    public E getLast()
        return delegate.getLast();

    public E peekFirst()
        return delegate.peekFirst();

    public E peekLast()
        return delegate.peekLast();

    public boolean removeFirstOccurrence(Object o)
        return delegate.removeFirstOccurrence(o);

    public boolean removeLastOccurrence(Object o)
        return delegate.removeLastOccurrence(o);

    public boolean add(E e)
        return delegate.add(e);

    public boolean offer(E e)
        return delegate.offer(e);

    public void put(E e) throws InterruptedException

    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException
        return delegate.offer(e, timeout, unit);

    public E remove()
        return removeFirst();

    public E poll()
        return pollFirst();

    public E take() throws InterruptedException
        return takeFirst();

    public E poll(long timeout, TimeUnit unit) throws InterruptedException
        return pollFirst(timeout, unit);

    public E element()
        return getFirst();

    public E peek()
        return peekFirst();

    public int remainingCapacity()
        return delegate.remainingCapacity();

    public int drainTo(Collection<? super E> c)
        int result = 0;
        while (true)
            E next = pollFirst();
            if (next == null)
        return result;

    public int drainTo(Collection<? super E> c, int maxElements)
        int result = 0;
            E next = pollFirst();
            if (next == null)
        while (result < maxElements);
        return result;

    public void push(E e)

    public E pop()
        return removeFirst();

    public boolean remove(Object o)
        return removeFirstOccurrence(o);

    public int size()
        return delegate.size();

    public boolean contains(Object o)
        return delegate.contains(o);

    public Object[] toArray()
        return delegate.toArray();

    public <T> T[] toArray(T[] a)
        return delegate.toArray(a);

    public String toString()
        return delegate.toString();

    public void clear()

    public Iterator<E> iterator()
        return wrap(delegate.iterator());

     * @param delegateIterator the iterator to delegate to
     * @return an iterator that respects the rate-limit
    private Iterator<E> wrap(Iterator<E> delegateIterator)
        return new Iterator<E>()
            private E previousElement = null;

            public boolean hasNext()
                return delegateIterator.hasNext();

            public E next()
                return delegateIterator.next();

            public void remove()
                if (previousElement == null)
                    throw new IllegalStateException("next() not invoked, or remove() already invoked");
                catch (InterruptedException e)
                    throw new IllegalStateException(e);
                previousElement = null;

    public Iterator<E> descendingIterator()
        return wrap(delegate.descendingIterator());

    public boolean addAll(Collection<? extends E> c)
        requireThat("c", c).isNotNull().isNotEqualTo("this", this);
        boolean modified = false;
        for (E e: c)
            if (add(e))
                modified = true;
        return modified;

    public boolean isEmpty()
        return delegate.isEmpty();

    public boolean containsAll(Collection<?> c)
        return delegate.containsAll(c);

    public boolean removeAll(Collection<?> c)
        Iterator<E> i = iterator();
        boolean modified = true;
        while (i.hasNext())
            E element = i.next();
            if (c.contains(element))
                modified = true;
        return modified;

    public boolean retainAll(Collection<?> c)
        Iterator<E> i = iterator();
        boolean modified = true;
        while (i.hasNext())
            E element = i.next();
            if (!c.contains(element))
                modified = true;
        return modified;

    public int hashCode()
        return delegate.hashCode();

    public boolean equals(Object obj)
        return delegate.equals(obj);