高竞争的消费者-生产者
Consumer-Producer with high contention
我想知道实现多个生产者 - 单个消费者场景的最佳机制是什么,在这种情况下我必须保持当前未处理请求的数量是最新的。
我的第一个想法是使用 ConcurrentLinkedQueue:
public class SomeQueueAbstraction {
private Queue<SomeObject> concurrentQueue = new ConcurrentLinkedQueue<>();
private int size;
public void add(Object request) {
SomeObject object = convertIncomingRequest(request);
concurrentQueue.add(object);
size++;
}
public SomeObject getHead() {
SomeObject object = concurrentQueue.poll();
size--;
}
// other methods
问题在于我必须在 add
和 size ++
以及 poll
和 size--
上显式同步,以便始终准确 size
这使得 ConccurentLinkedQueue
一开始就毫无意义。
在保持数据一致性的同时实现尽可能好的性能的最佳方法是什么?
我应该改用 ArrayDequeue
并显式同步还是有更好的方法来实现此目的?
这里有点类似 question/answer:
java.util.ConcurrentLinkedQueue
讨论了 ConcurrentLinkedQueue
上的复合操作如何自然地不是原子的,但没有直接的答案对于给定的场景什么是最佳选择。
注意:我正在明确计算大小,因为固有 .size() 方法的时间复杂度为 O(n)。
注意 2:我还担心我没有明确编写的 getSize() 方法会增加更多的争用开销。可以调用的比较频繁
我正在寻找处理多个生产者的最有效方法 - 频繁调用 getSize() 的单个消费者。
替代建议:如果 SomeObject 结构中有 elementId,我可以从 ConcurrentLinkedQueue.poll() 获取当前大小,并且只需要在生成此类 id 的机制中进行锁定。 Add 和 get 现在可以在没有额外锁定的情况下正确使用。作为替代方案,这种票价如何?
您可以使用显式锁,这意味着您可能不需要并发队列。
public class SomeQueueAbstraction {
private Queue<SomeObject> queue = new LinkedList<>();
private volatile int size;
private Object lock = new Object();
public void add(Object request) {
SomeObject object = convertIncomingRequest(request);
synchronized(lock) {
queue.add(object);
size++;
}
}
public SomeObject getHead() {
SomeObject object = null;
synchronized(lock) {
object = queue.poll();
size--;
}
return object;
}
public int getSize() {
synchronized(lock) {
return size;
}
}
// other methods
}
这样,adding/removing 个元素 to/from 队列和更新 size
将安全完成。
所以要求是报告最新的当前未处理请求数。而且这经常被要求,这确实使 ConcurrentLinkedQueue.size()
不合适。
这可以使用 AtomicInteger 来完成:速度很快,并且总是尽可能接近当前未处理请求的数量。
这里是一个例子,注意一些小的更新以确保报告的大小是准确的:
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class SomeQueueAbstraction {
private final Queue<SomeObject> concurrentQueue = new ConcurrentLinkedQueue<>();
private final AtomicInteger size = new AtomicInteger();
public boolean add(Object request) {
SomeObject object = convertIncomingRequest(request);
if (concurrentQueue.add(object)) {
size.incrementAndGet();
return true;
}
return false;
}
public SomeObject remove() {
SomeObject object = concurrentQueue.poll();
if (object != null) {
size.decrementAndGet();
}
return object;
}
public int getSize() { return size.get(); }
private SomeObject convertIncomingRequest(Object request) {
return new SomeObject(getSize());
}
class SomeObject {
int id;
SomeObject(int id) { this.id = id; }
}
}
我想知道实现多个生产者 - 单个消费者场景的最佳机制是什么,在这种情况下我必须保持当前未处理请求的数量是最新的。
我的第一个想法是使用 ConcurrentLinkedQueue:
public class SomeQueueAbstraction {
private Queue<SomeObject> concurrentQueue = new ConcurrentLinkedQueue<>();
private int size;
public void add(Object request) {
SomeObject object = convertIncomingRequest(request);
concurrentQueue.add(object);
size++;
}
public SomeObject getHead() {
SomeObject object = concurrentQueue.poll();
size--;
}
// other methods
问题在于我必须在 add
和 size ++
以及 poll
和 size--
上显式同步,以便始终准确 size
这使得 ConccurentLinkedQueue
一开始就毫无意义。
在保持数据一致性的同时实现尽可能好的性能的最佳方法是什么?
我应该改用 ArrayDequeue
并显式同步还是有更好的方法来实现此目的?
这里有点类似 question/answer:
java.util.ConcurrentLinkedQueue
讨论了 ConcurrentLinkedQueue
上的复合操作如何自然地不是原子的,但没有直接的答案对于给定的场景什么是最佳选择。
注意:我正在明确计算大小,因为固有 .size() 方法的时间复杂度为 O(n)。
注意 2:我还担心我没有明确编写的 getSize() 方法会增加更多的争用开销。可以调用的比较频繁
我正在寻找处理多个生产者的最有效方法 - 频繁调用 getSize() 的单个消费者。
替代建议:如果 SomeObject 结构中有 elementId,我可以从 ConcurrentLinkedQueue.poll() 获取当前大小,并且只需要在生成此类 id 的机制中进行锁定。 Add 和 get 现在可以在没有额外锁定的情况下正确使用。作为替代方案,这种票价如何?
您可以使用显式锁,这意味着您可能不需要并发队列。
public class SomeQueueAbstraction {
private Queue<SomeObject> queue = new LinkedList<>();
private volatile int size;
private Object lock = new Object();
public void add(Object request) {
SomeObject object = convertIncomingRequest(request);
synchronized(lock) {
queue.add(object);
size++;
}
}
public SomeObject getHead() {
SomeObject object = null;
synchronized(lock) {
object = queue.poll();
size--;
}
return object;
}
public int getSize() {
synchronized(lock) {
return size;
}
}
// other methods
}
这样,adding/removing 个元素 to/from 队列和更新 size
将安全完成。
所以要求是报告最新的当前未处理请求数。而且这经常被要求,这确实使 ConcurrentLinkedQueue.size()
不合适。
这可以使用 AtomicInteger 来完成:速度很快,并且总是尽可能接近当前未处理请求的数量。
这里是一个例子,注意一些小的更新以确保报告的大小是准确的:
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class SomeQueueAbstraction {
private final Queue<SomeObject> concurrentQueue = new ConcurrentLinkedQueue<>();
private final AtomicInteger size = new AtomicInteger();
public boolean add(Object request) {
SomeObject object = convertIncomingRequest(request);
if (concurrentQueue.add(object)) {
size.incrementAndGet();
return true;
}
return false;
}
public SomeObject remove() {
SomeObject object = concurrentQueue.poll();
if (object != null) {
size.decrementAndGet();
}
return object;
}
public int getSize() { return size.get(); }
private SomeObject convertIncomingRequest(Object request) {
return new SomeObject(getSize());
}
class SomeObject {
int id;
SomeObject(int id) { this.id = id; }
}
}