使用 ConcurrentLinkedQueue<T> 序列化访问 Collection

Serialized access to Collection with the use of ConcurrentLinkedQueue<T>

我有以下关于 Java 7 ConcurrentLinkedQueue 的问题。让我们假设我有以下 class:

public class Blah {
    private ConcurrentLinkedQueue<String> queue;

    public Blah() {
        queue = new ConcurrentLinkedQueue<String>();
    }

    public void produce(String action, String task) throws InterruptedException {
        synchronized(queue) {
            while(queue.size() >= 8) 
                queue.wait();
            queue.add(action + "#" + task);
            queue.notifyAll();
        }
    }

    public void consume() throws InterruptedException {
        synchronized(queue) {
            while(queue.size() <= 0)
                queue.wait();
            String element = queue.poll();
            StringTokenizer strTok = new StringTokenizer(element, "#");
            String action = strTok.nextToken();
            String task = strTok.nextToken();
            /**
             * Operate on request
             */
            queue.notifyAll();
        }
    }
}

生产和消费函数将由并发线程调用,以便 generate/remove 线程 for/from 列表。我实现了前面的函数 consume() 和 produce() 以便序列化队列中的 addition/removal 个元素。以上是必需的,还是 ConcurrentLinkedQueue 负责?我问是因为我不想降低我的代码的性能。

谢谢, 尼克

您确实降低了代码的性能,因为您使用的是来自同步机制的 'slowest' 内置同步。

您有 BlockingQueue 的完美用例。它为您提供 puttake 操作,这些操作会阻塞直到 space/element 可用。

ConcurrentLinkedQueue 只提供线程安全,不提供同步。这意味着您可以在多线程应用程序中安全地 add/remove 队列中的元素,但它没有为您提供等待 space/elemetns 的机制,因此您为此目的正确使用了 wait()、notify() 方法(尽管您也可以在任何其他共享对象上进行同步,但不必是那个队列)。在 Java 中使用 Lock 而不是 synchronized 要快得多。

假设 produceconsume 将被独立调用,您可能会以无限循环结束,因为两者都试图锁定队列对象,因此一旦 consume 没有' 找到它永远不会释放锁的任何元素,阻止 produce 添加元素。同步逻辑可能应该在调用 consumeproduce.

的方法中实现

TL;DR:您正在使用 Queue 专门设计为非阻塞的 BlockingQueue

您的代码可以重写为:

public class Blah {
    private BlockingQueue<String> queue;

    public Blah() {
        queue = new LinkedBlockingQueue<>(8);
    }

    public void produce(String action, String task) throws InterruptedException {
        while (true) {
            queue.put(action + "#" + task);
        }
    }

    public void consume() throws InterruptedException {
        while (true) {
            final String[] data = queue.take().split("#");
            final String action = data[0];
            final String task = data[1];
        }
    }
}

BlockingQueue8 个元素为界。 put 如果队列已满将阻塞。 take 如果队列为空,将阻塞。

不需要同步。

此外,StringTokenizer 已弃用。我建议您使用 class 类似的东西:

public class Data {
    private final String action;
    private final String task;

    public Data(final String action, final String task) {
        this.action = action;
        this.task = task;
    }

    public String getAction() {
        return action;
    }

    public String getTask() {
        return task;
    }
}

交换数据。没有理由创建和解析 Strings.