如何实现异步队列?
How to implement asynchronous queue?
给定以下队列变化:
interface AsyncQueue<T> {
//add new element to the queue
void add(T elem);
//request single element from the queue via callback
//callback will be called once for single polled element when it is available
//so, to request multiple elements, poll() must be called multiple times with (possibly) different callbacks
void poll(Consumer<T> callback);
}
我发现我不知道如何使用 java.util.concurrent 原语来实现它!所以问题是:
- 使用 java.util.concurrent 包实现它的正确方法是什么?
- 是否可以 w/o 使用额外的线程池来做到这一点?
您的 AsyncQueue
与 BlockingQueue
非常相似,例如 ArrayBlockingQueue
。返回的 Future
将简单地委托给 ArrayBlockingQueue
方法。例如,Future.get
会调用 blockingQueue.poll
。
至于你的更新,我假设调用 add
的线程应该在有一个等待时调用回调?如果是这样,为元素创建一个队列和为回调创建一个队列是一项简单的任务。
- 添加时,检查是否有回调等待,然后调用它,否则将元素放入元素队列
- 轮询时,检查是否有元素在等待,然后用该元素调用回调,否则将回调放入回调队列
代码大纲:
class AsyncQueue<E> {
Queue<Consumer<E>> callbackQueue = new LinkedList<>();
Queue<E> elementQueue = new LinkedList<>();
public synchronized void add(E e) {
if (callbackQueue.size() > 0)
callbackQueue.remove().accept(e);
else
elementQueue.offer(e);
}
public synchronized void poll(Consumer<E> c) {
if (elementQueue.size() > 0)
c.accept(elementQueue.remove());
else
callbackQueue.offer(c);
}
}
给定以下队列变化:
interface AsyncQueue<T> {
//add new element to the queue
void add(T elem);
//request single element from the queue via callback
//callback will be called once for single polled element when it is available
//so, to request multiple elements, poll() must be called multiple times with (possibly) different callbacks
void poll(Consumer<T> callback);
}
我发现我不知道如何使用 java.util.concurrent 原语来实现它!所以问题是:
- 使用 java.util.concurrent 包实现它的正确方法是什么?
- 是否可以 w/o 使用额外的线程池来做到这一点?
您的 AsyncQueue
与 BlockingQueue
非常相似,例如 ArrayBlockingQueue
。返回的 Future
将简单地委托给 ArrayBlockingQueue
方法。例如,Future.get
会调用 blockingQueue.poll
。
至于你的更新,我假设调用 add
的线程应该在有一个等待时调用回调?如果是这样,为元素创建一个队列和为回调创建一个队列是一项简单的任务。
- 添加时,检查是否有回调等待,然后调用它,否则将元素放入元素队列
- 轮询时,检查是否有元素在等待,然后用该元素调用回调,否则将回调放入回调队列
代码大纲:
class AsyncQueue<E> {
Queue<Consumer<E>> callbackQueue = new LinkedList<>();
Queue<E> elementQueue = new LinkedList<>();
public synchronized void add(E e) {
if (callbackQueue.size() > 0)
callbackQueue.remove().accept(e);
else
elementQueue.offer(e);
}
public synchronized void poll(Consumer<E> c) {
if (elementQueue.size() > 0)
c.accept(elementQueue.remove());
else
callbackQueue.offer(c);
}
}